metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From o...@apache.org
Subject [3/6] metron git commit: METRON-1136 Track Master in Feature Branch (ottobackwards) closes apache/metron#752
Date Wed, 13 Sep 2017 02:58:44 GMT
http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoHashFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoHashFunctionsTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoHashFunctionsTest.java
new file mode 100644
index 0000000..f1a0ec4
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoHashFunctionsTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.stellar;
+
+import ch.hsr.geohash.WGS84Point;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class GeoHashFunctionsTest {
+  public static WGS84Point empireStatePoint = new WGS84Point(40.748570, -73.985752);
+  public static WGS84Point mosconeCenterPoint = new WGS84Point(37.782891, -122.404166);
+  public static WGS84Point jutlandPoint = new WGS84Point(57.64911, 10.40740);
+  public static String explicitJutlandHash = "u4pruydqmvpb";
+  String empireStateHash = (String)StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long)"
+                             , ImmutableMap.of("lat", empireStatePoint.getLatitude()
+                                              ,"long",empireStatePoint.getLongitude()
+                                              )
+    );
+  String mosconeCenterHash = (String)StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long)"
+                             , ImmutableMap.of("lat", mosconeCenterPoint.getLatitude()
+                                              ,"long",mosconeCenterPoint.getLongitude()
+                                              )
+    );
+  String jutlandHash = (String)StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long)"
+                             , ImmutableMap.of("lat", jutlandPoint.getLatitude()
+                                              ,"long",jutlandPoint.getLongitude()
+                                              )
+  );
+
+  @Test
+  public void testToLatLong_happypath() throws Exception {
+    Map<String, Object> latLong = (Map<String, Object>)StellarProcessorUtils.run("GEOHASH_TO_LATLONG(hash)"
+            , ImmutableMap.of("hash", explicitJutlandHash ) );
+    Assert.assertEquals(jutlandPoint.getLatitude(), (double)latLong.get("latitude"), 1e-3);
+    Assert.assertEquals(jutlandPoint.getLongitude(), (double)latLong.get("longitude"), 1e-3);
+  }
+
+  @Test
+  public void testToLatLong_degenerate() throws Exception {
+    {
+      Map<String, Object> latLong = (Map<String, Object>) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(hash)"
+              , ImmutableMap.of("hash", "u"));
+      Assert.assertFalse(Double.isNaN((double) latLong.get("latitude")));
+      Assert.assertFalse(Double.isNaN((double) latLong.get("longitude")));
+    }
+    {
+      Map<String, Object> latLong = (Map<String, Object>) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(hash)"
+              , ImmutableMap.of("hash", ""));
+      Assert.assertEquals(0d, (double)latLong.get("latitude"), 1e-3);
+      Assert.assertEquals(0d, (double)latLong.get("longitude"), 1e-3);
+    }
+    {
+      Map<String, Object> latLong = (Map<String, Object>) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(null)"
+              , new HashMap<>());
+      Assert.assertNull(latLong);
+    }
+  }
+
+  @Test
+  public void testHash_fromlatlong() throws Exception {
+    Assert.assertEquals("u4pruydqmv", StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long, 10)"
+                             , ImmutableMap.of("lat", jutlandPoint.getLatitude()
+                                              ,"long",jutlandPoint.getLongitude()
+                                              )
+                             )
+    );
+
+    Assert.assertEquals("u4pruydqmvpb", StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long)"
+                             , ImmutableMap.of("lat", jutlandPoint.getLatitude()
+                                              ,"long",jutlandPoint.getLongitude()
+                                              )
+                             )
+    );
+    Assert.assertEquals("u4pruydqmv".substring(0, 6), StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long, 6)"
+                             , ImmutableMap.of("lat", jutlandPoint.getLatitude()
+                                              ,"long",jutlandPoint.getLongitude()
+                                              )
+                             )
+    );
+    Assert.assertNull(StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat)"
+                             , ImmutableMap.of("lat", jutlandPoint.getLatitude()
+                                              )
+                             )
+    );
+    Assert.assertNull(StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long, 10)"
+                             , ImmutableMap.of("lat", "blah"
+                                              ,"long",jutlandPoint.getLongitude()
+                                              )
+                             )
+    );
+  }
+
+  @Test
+  public void testHash_fromLocation() throws Exception {
+    Map<String, String> loc = ImmutableMap.of( "latitude", "" + jutlandPoint.getLatitude()
+                                             , "longitude","" + jutlandPoint.getLongitude()
+                                                                     );
+    Assert.assertEquals("u4pruydqmv", StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc, 10)"
+                             , ImmutableMap.of("loc", loc
+                                              )
+                             )
+    );
+
+    Assert.assertEquals("u4pruydqmv".substring(0, 6), StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc, 6)"
+                             , ImmutableMap.of("loc", loc
+                                              )
+                             )
+    );
+
+    Assert.assertEquals("u4pruydqmvpb", StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc)"
+                             , ImmutableMap.of("loc", loc
+                                              )
+                             )
+    );
+    Assert.assertNull(StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc)"
+                                               , ImmutableMap.of("loc", ImmutableMap.of( "latitude", "57.64911" ))
+                             )
+    );
+    Assert.assertNull(StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc, 10)"
+                                                , ImmutableMap.of("loc", ImmutableMap.of( "latitude", "blah"
+                                                                                        , "longitude","10.40740"
+                                                                     )
+                                              )
+
+                             )
+    );
+  }
+
+  @Test
+  public void testDistanceHaversine() throws Exception {
+    testDistance(Optional.empty());
+    testDistance(Optional.of("HAVERSINE"));
+  }
+
+  @Test
+  public void testDistanceLawOfCosines() throws Exception {
+    testDistance(Optional.of("LAW_OF_COSINES"));
+  }
+
+  @Test
+  public void testDistanceLawOfVicenty() throws Exception {
+    testDistance(Optional.of("VICENTY"));
+  }
+
+  @Test
+  public void testMaxDistance_happyPath() throws Exception {
+    Double maxDistance = (double) StellarProcessorUtils.run("GEOHASH_MAX_DIST([empireState, mosconeCenter, jutland])"
+            , ImmutableMap.of("empireState", empireStateHash
+                    , "mosconeCenter", mosconeCenterHash
+                    , "jutland", jutlandHash
+            )
+    );
+    double expectedDistance = 8528;
+    Assert.assertEquals(expectedDistance, maxDistance, 1d);
+  }
+
+  @Test
+  public void testMaxDistance_differentOrder() throws Exception {
+    Double maxDistance = (double) StellarProcessorUtils.run("GEOHASH_MAX_DIST([jutland, mosconeCenter, empireState])"
+            , ImmutableMap.of("empireState", empireStateHash
+                    , "mosconeCenter", mosconeCenterHash
+                    , "jutland", jutlandHash
+            )
+    );
+    double expectedDistance = 8528;
+    Assert.assertEquals(expectedDistance, maxDistance, 1d);
+  }
+
+  @Test
+  public void testMaxDistance_withNulls() throws Exception {
+    Double maxDistance = (double) StellarProcessorUtils.run("GEOHASH_MAX_DIST([jutland, mosconeCenter, empireState, null])"
+            , ImmutableMap.of("empireState", empireStateHash
+                    , "mosconeCenter", mosconeCenterHash
+                    , "jutland", jutlandHash
+            )
+    );
+    double expectedDistance = 8528;
+    Assert.assertEquals(expectedDistance, maxDistance, 1d);
+  }
+  @Test
+  public void testMaxDistance_allSame() throws Exception {
+    Double maxDistance = (double) StellarProcessorUtils.run("GEOHASH_MAX_DIST([jutland, jutland, jutland])"
+            , ImmutableMap.of( "jutland", jutlandHash )
+    );
+    Assert.assertEquals(0, maxDistance, 1e-6d);
+  }
+
+  @Test
+  public void testMaxDistance_emptyList() throws Exception {
+    Double maxDistance = (double) StellarProcessorUtils.run("GEOHASH_MAX_DIST([])" , new HashMap<>() );
+    Assert.assertTrue(Double.isNaN(maxDistance));
+  }
+
+  @Test
+  public void testMaxDistance_nullList() throws Exception {
+    Double maxDistance = (Double) StellarProcessorUtils.run("GEOHASH_MAX_DIST(null)" , new HashMap<>() );
+    Assert.assertNull(maxDistance);
+  }
+
+  @Test
+  public void testMaxDistance_invalidList() throws Exception {
+    Double maxDistance = (Double) StellarProcessorUtils.run("GEOHASH_MAX_DIST()" , new HashMap<>() );
+    Assert.assertNull(maxDistance);
+  }
+
+  public void testDistance(Optional<String> method) throws Exception {
+    double expectedDistance = 4128; //in kilometers
+    Map<String, Object> vars = ImmutableMap.of("empireState", empireStateHash, "mosconeCenter", mosconeCenterHash);
+    //ensure that d(x, y) == d(y, x) and that both are the same as the expected (up to 1 km accuracy)
+    {
+      String stellarStatement = getDistStellarStatement(ImmutableList.of("mosconeCenter", "empireState"), method);
+      Assert.assertEquals(expectedDistance, (double) StellarProcessorUtils.run(stellarStatement , vars ), 1D );
+    }
+    {
+      String stellarStatement = getDistStellarStatement(ImmutableList.of("empireState", "mosconeCenter"), method);
+      Assert.assertEquals(expectedDistance, (double) StellarProcessorUtils.run(stellarStatement , vars ), 1D );
+    }
+  }
+
+  private static String getDistStellarStatement(List<String> hashVariables, Optional<String> method) {
+    if(method.isPresent()) {
+      List<String> vars = new ArrayList<>();
+      vars.addAll(hashVariables);
+      vars.add("\'" + method.get() + "\'");
+      return "GEOHASH_DIST(" + Joiner.on(",").skipNulls().join(vars) + ")";
+    }
+    else {
+      return "GEOHASH_DIST(" + Joiner.on(",").skipNulls().join(hashVariables) + ")";
+    }
+  }
+
+  @Test
+  public void testCentroid_List() throws Exception {
+    //happy path
+    {
+      double expectedLong = -98.740087 //calculated via http://www.geomidpoint.com/ using the center of gravity or geographic midpoint.
+         , expectedLat = 41.86921
+         ;
+      Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID([empireState, mosconeCenter]))"
+              , ImmutableMap.of("empireState", empireStateHash, "mosconeCenter", mosconeCenterHash)
+      );
+      Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3);
+      Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3);
+    }
+    //same point
+    {
+      double expectedLong = empireStatePoint.getLongitude()
+         , expectedLat = empireStatePoint.getLatitude()
+         ;
+      Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID([empireState, empireState]))"
+              , ImmutableMap.of("empireState", empireStateHash)
+      );
+      Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3);
+      Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3);
+    }
+    //one point
+    {
+      double expectedLong = empireStatePoint.getLongitude()
+         , expectedLat = empireStatePoint.getLatitude()
+         ;
+      Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID([empireState]))"
+              , ImmutableMap.of("empireState", empireStateHash)
+      );
+      Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3);
+      Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3);
+    }
+    //no points
+    {
+      Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID([]))"
+              , new HashMap<>()
+      );
+      Assert.assertNull(centroid);
+    }
+  }
+
+  @Test
+  public void testCentroid_weighted() throws Exception {
+    //happy path
+    {
+      double expectedLong = -98.740087 //calculated via http://www.geomidpoint.com/ using the center of gravity or geographic midpoint.
+         , expectedLat = 41.86921
+         ;
+      for(int weight = 1;weight < 10;++weight) {
+        Map<Object, Integer> weightedPoints = ImmutableMap.of(empireStateHash, weight, mosconeCenterHash, weight);
+        Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID(weightedPoints))"
+                , ImmutableMap.of("weightedPoints", weightedPoints)
+        );
+        Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3);
+        Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3);
+      }
+    }
+    //same point
+    {
+      double expectedLong = empireStatePoint.getLongitude()
+         , expectedLat = empireStatePoint.getLatitude()
+         ;
+      for(int weight = 1;weight < 10;++weight) {
+        Map<Object, Integer> weightedPoints = ImmutableMap.of(empireStateHash, weight);
+        Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID(weightedPoints))"
+                , ImmutableMap.of("weightedPoints", weightedPoints)
+        );
+        Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3);
+        Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3);
+      }
+    }
+    //no points
+    {
+      Map<Object, Integer> weightedPoints = new HashMap<>();
+      Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID(weightedPoints))"
+                , ImmutableMap.of("weightedPoints", weightedPoints)
+        );
+      Assert.assertNull(centroid);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase-client/pom.xml b/metron-platform/metron-hbase-client/pom.xml
index 5dd6127..1237be7 100644
--- a/metron-platform/metron-hbase-client/pom.xml
+++ b/metron-platform/metron-hbase-client/pom.xml
@@ -80,6 +80,16 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>org.apache.commons.logging</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/pom.xml b/metron-platform/metron-indexing/pom.xml
index c64c374..7d07665 100644
--- a/metron-platform/metron-indexing/pom.xml
+++ b/metron-platform/metron-indexing/pom.xml
@@ -222,6 +222,16 @@
                         <configuration>
                             <shadedArtifactAttached>true</shadedArtifactAttached>
                             <shadedClassifierName>uber</shadedClassifierName>
+                            <filters>
+                              <filter>
+                                <artifact>*:*</artifact>
+                                <excludes>
+                                  <exclude>META-INF/*.SF</exclude>
+                                  <exclude>META-INF/*.DSA</exclude>
+                                  <exclude>META-INF/*.RSA</exclude>
+                                </excludes>
+                              </filter>
+                            </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.google.common</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
index ddb88e5..4f47a65 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
@@ -17,7 +17,6 @@
  */
 package org.apache.metron.indexing.dao;
 
-import org.apache.metron.hbase.HTableProvider;
 import org.apache.metron.hbase.TableProvider;
 
 import java.util.HashMap;
@@ -26,6 +25,7 @@ import java.util.function.Supplier;
 
 public class AccessConfig {
   private Integer maxSearchResults;
+  private Integer maxSearchGroups;
   private Supplier<Map<String, Object>> globalConfigSupplier;
   private Map<String, String> optionalSettings = new HashMap<>();
   private TableProvider tableProvider = null;
@@ -55,6 +55,18 @@ public class AccessConfig {
   }
 
   /**
+   * The maximum search groups.
+   * @return
+   */
+  public Integer getMaxSearchGroups() {
+    return maxSearchGroups;
+  }
+
+  public void setMaxSearchGroups(Integer maxSearchGroups) {
+    this.maxSearchGroups = maxSearchGroups;
+  }
+
+  /**
    * Get optional settings for initializing indices.
    * @return
    */

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
index a1cf398..c890544 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
@@ -65,6 +67,11 @@ public class HBaseDao implements IndexDao {
   }
 
   @Override
+  public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
+    return null;
+  }
+
+  @Override
   public synchronized void init(AccessConfig config) {
     if(this.tableInterface == null) {
       this.config = config;

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
index 350e402..745dccd 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
@@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.flipkart.zjsonpatch.JsonPatch;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
@@ -33,7 +35,6 @@ import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import java.io.IOException;
 import org.apache.metron.indexing.dao.search.FieldType;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -49,6 +50,8 @@ public interface IndexDao {
    */
   SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException;
 
+  GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException;
+
   /**
    * Initialize the DAO with the AccessConfig object.
    * @param config

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
index e9a4a9a..61c6231 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
@@ -22,6 +22,8 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
@@ -118,6 +120,17 @@ public class MultiIndexDao implements IndexDao {
   }
 
   @Override
+  public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
+    for(IndexDao dao : indices) {
+      GroupResponse s = dao.group(groupRequest);
+      if(s != null) {
+        return s;
+      }
+    }
+    return null;
+  }
+
+  @Override
   public void init(AccessConfig config) {
     for(IndexDao dao : indices) {
       dao.init(config);

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java
new file mode 100644
index 0000000..be02026
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+public class Group {
+
+  private GroupOrder order;
+  private String field;
+
+  public Group() {
+    order = new GroupOrder();
+    order.setGroupOrderType(GroupOrderType.TERM.toString());
+    order.setSortOrder(SortOrder.DESC.toString());
+  }
+
+  public GroupOrder getOrder() {
+    return order;
+  }
+
+  public void setOrder(GroupOrder order) {
+    this.order = order;
+  }
+
+  public String getField() {
+    return field;
+  }
+
+  public void setField(String field) {
+    this.field = field;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java
new file mode 100644
index 0000000..b90c438
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+public class GroupOrder {
+
+  private SortOrder sortOrder;
+  private GroupOrderType groupOrderType;
+
+  public SortOrder getSortOrder() {
+    return sortOrder;
+  }
+
+  public void setSortOrder(String sortOrder) {
+    this.sortOrder = SortOrder.fromString(sortOrder);
+  }
+
+  public GroupOrderType getGroupOrderType() {
+    return groupOrderType;
+  }
+
+  public void setGroupOrderType(String groupOrderType) {
+    this.groupOrderType = GroupOrderType.fromString(groupOrderType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java
new file mode 100644
index 0000000..8444e50
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum GroupOrderType {
+
+  @JsonProperty("count")
+  COUNT("count"),
+  @JsonProperty("term")
+  TERM("term");
+
+  private String groupOrderType;
+
+  GroupOrderType(String groupOrderType) {
+    this.groupOrderType = groupOrderType;
+  }
+
+  public String getGroupOrderType() {
+    return groupOrderType;
+  }
+
+  public static GroupOrderType fromString(String groupOrderType) {
+    return GroupOrderType.valueOf(groupOrderType.toUpperCase());
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java
new file mode 100644
index 0000000..121da10
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+import java.util.List;
+import java.util.Optional;
+
+public class GroupRequest {
+
+  private List<String> indices;
+  private String query;
+  private String scoreField;
+  private List<Group> groups;
+
+  public List<String> getIndices() {
+    return indices;
+  }
+
+  public void setIndices(List<String> indices) {
+    this.indices = indices;
+  }
+
+  public String getQuery() {
+    return query;
+  }
+
+  public void setQuery(String query) {
+    this.query = query;
+  }
+
+  public Optional<String> getScoreField() {
+    return scoreField == null ? Optional.empty() : Optional.of(scoreField);
+  }
+
+  public void setScoreField(String scoreField) {
+    this.scoreField = scoreField;
+  }
+
+  public List<Group> getGroups() {
+    return groups;
+  }
+
+  public void setGroups(List<Group> groups) {
+    this.groups = groups;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java
new file mode 100644
index 0000000..1b42609
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+import java.util.List;
+
+public class GroupResponse {
+
+  private String groupedBy;
+  private List<GroupResult> groupResults;
+
+  public String getGroupedBy() {
+    return groupedBy;
+  }
+
+  public void setGroupedBy(String groupedBy) {
+    this.groupedBy = groupedBy;
+  }
+
+  public List<GroupResult> getGroupResults() {
+    return groupResults;
+  }
+
+  public void setGroupResults(List<GroupResult> groupResults) {
+    this.groupResults = groupResults;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java
new file mode 100644
index 0000000..d40f146
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.dao.search;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import java.util.List;
+
+public class GroupResult {
+
+  private String key;
+  private long total;
+  private Double score;
+  private String groupedBy;
+  private List<GroupResult> groupResults;
+
+  public String getKey() {
+    return key;
+  }
+
+  public void setKey(String key) {
+    this.key = key;
+  }
+
+  public long getTotal() {
+    return total;
+  }
+
+  public void setTotal(long total) {
+    this.total = total;
+  }
+
+  public Double getScore() {
+    return score;
+  }
+
+  public void setScore(Double score) {
+    this.score = score;
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getGroupedBy() {
+    return groupedBy;
+  }
+
+  public void setGroupedBy(String groupedBy) {
+    this.groupedBy = groupedBy;
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public List<GroupResult> getGroupResults() {
+    return groupResults;
+  }
+
+  public void setGroupResults(List<GroupResult> groups) {
+    this.groupResults = groups;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
index 2d146e0..6e48b58 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
@@ -22,7 +22,6 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Iterables;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.*;
 import org.apache.metron.indexing.dao.update.Document;
@@ -77,6 +76,27 @@ public class InMemoryDao implements IndexDao {
     return ret;
   }
 
+  @Override
+  public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
+    GroupResponse groupResponse = new GroupResponse();
+    groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField());
+    groupResponse.setGroupResults(getGroupResults(groupRequest.getGroups(), 0));
+    return groupResponse;
+  }
+
+  private List<GroupResult> getGroupResults(List<Group> groups, int index) {
+    Group group = groups.get(index);
+    GroupResult groupResult = new GroupResult();
+    groupResult.setKey(group.getField() + "_value");
+    if (index < groups.size() - 1) {
+      groupResult.setGroupedBy(groups.get(index + 1).getField());
+      groupResult.setGroupResults(getGroupResults(groups, index + 1));
+    } else {
+      groupResult.setScore(50.0);
+    }
+    groupResult.setTotal(10);
+    return Collections.singletonList(groupResult);
+  }
 
   private static class ComparableComparator implements Comparator<Comparable>  {
     SortOrder order = null;

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
index 2645df2..0db8e37 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
@@ -20,10 +20,13 @@ package org.apache.metron.indexing.dao;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
+import org.apache.metron.indexing.dao.search.GroupResult;
 import org.apache.metron.integration.InMemoryComponent;
 import org.junit.After;
 import org.junit.Assert;
@@ -40,11 +43,11 @@ import java.util.Map;
 public abstract class SearchIntegrationTest {
   /**
    * [
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", "duplicate_name_field": "data 1"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "double_field": 1.00002, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 2", "duplicate_name_field": "data 2"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"}
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", "duplicate_name_field": "data 1"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "score": 50.0, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 2", "duplicate_name_field": "data 2"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "score": 20.0, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "score": 98.0, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"}
    * ]
    */
   @Multiline
@@ -52,11 +55,11 @@ public abstract class SearchIntegrationTest {
 
   /**
    * [
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5}
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "score": 20.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "score": 50.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "score": 10.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5}
    * ]
    */
   @Multiline
@@ -149,7 +152,7 @@ public abstract class SearchIntegrationTest {
 
   /**
    * {
-   * "facetFields": ["source:type", "ip_src_addr", "ip_src_port", "long_field", "timestamp", "latitude", "double_field", "is_alert"],
+   * "facetFields": ["source:type", "ip_src_addr", "ip_src_port", "long_field", "timestamp", "latitude", "score", "is_alert"],
    * "indices": ["bro", "snort"],
    * "query": "*",
    * "from": 0,
@@ -253,6 +256,63 @@ public abstract class SearchIntegrationTest {
   @Multiline
   public static String noResultsFieldsQuery;
 
+  /**
+   * {
+   * "groups": [
+   *   {
+   *     "field":"is_alert"
+   *   },
+   *   {
+   *     "field":"latitude"
+   *   }
+   * ],
+   * "scoreField":"score",
+   * "indices": ["bro", "snort"],
+   * "query": "*"
+   * }
+   */
+  @Multiline
+  public static String groupByQuery;
+
+  /**
+   * {
+   * "groups": [
+   *   {
+   *     "field":"is_alert",
+   *     "order": {
+   *       "groupOrderType": "count",
+   *       "sortOrder": "ASC"
+   *     }
+   *   },
+   *   {
+   *     "field":"ip_src_addr",
+   *     "order": {
+   *       "groupOrderType": "term",
+   *       "sortOrder": "DESC"
+   *     }
+   *   }
+   * ],
+   * "indices": ["bro", "snort"],
+   * "query": "*"
+   * }
+   */
+  @Multiline
+  public static String sortedGroupByQuery;
+
+  /**
+   * {
+   * "groups": [
+   *   {
+   *     "field":"location_point"
+   *   }
+   * ],
+   * "indices": ["bro", "snort"],
+   * "query": "*"
+   * }
+   */
+  @Multiline
+  public static String badGroupQuery;
+
   protected static IndexDao dao;
   protected static InMemoryComponent indexComponent;
 
@@ -387,14 +447,18 @@ public abstract class SearchIntegrationTest {
       Assert.assertEquals(48.5839, Double.parseDouble(latitudeKeys.get(1)), 0.00001);
       Assert.assertEquals(new Long(2), latitudeCounts.get(latitudeKeys.get(0)));
       Assert.assertEquals(new Long(8), latitudeCounts.get(latitudeKeys.get(1)));
-      Map<String, Long> doubleFieldCounts = facetCounts.get("double_field");
-      Assert.assertEquals(2, doubleFieldCounts.size());
-      List<String> doubleFieldKeys = new ArrayList<>(doubleFieldCounts.keySet());
-      Collections.sort(doubleFieldKeys);
-      Assert.assertEquals(1.00001, Double.parseDouble(doubleFieldKeys.get(0)), 0.00001);
-      Assert.assertEquals(1.00002, Double.parseDouble(doubleFieldKeys.get(1)), 0.00001);
-      Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(0)));
-      Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(1)));
+      Map<String, Long> scoreFieldCounts = facetCounts.get("score");
+      Assert.assertEquals(4, scoreFieldCounts.size());
+      List<String> scoreFieldKeys = new ArrayList<>(scoreFieldCounts.keySet());
+      Collections.sort(scoreFieldKeys);
+      Assert.assertEquals(10.0, Double.parseDouble(scoreFieldKeys.get(0)), 0.00001);
+      Assert.assertEquals(20.0, Double.parseDouble(scoreFieldKeys.get(1)), 0.00001);
+      Assert.assertEquals(50.0, Double.parseDouble(scoreFieldKeys.get(2)), 0.00001);
+      Assert.assertEquals(98.0, Double.parseDouble(scoreFieldKeys.get(3)), 0.00001);
+      Assert.assertEquals(new Long(4), scoreFieldCounts.get(scoreFieldKeys.get(0)));
+      Assert.assertEquals(new Long(2), scoreFieldCounts.get(scoreFieldKeys.get(1)));
+      Assert.assertEquals(new Long(3), scoreFieldCounts.get(scoreFieldKeys.get(2)));
+      Assert.assertEquals(new Long(1), scoreFieldCounts.get(scoreFieldKeys.get(3)));
       Map<String, Long> isAlertCounts = facetCounts.get("is_alert");
       Assert.assertEquals(2, isAlertCounts.size());
       Assert.assertEquals(new Long(6), isAlertCounts.get("true"));
@@ -440,7 +504,7 @@ public abstract class SearchIntegrationTest {
       Assert.assertEquals(FieldType.LONG, broTypes.get("long_field"));
       Assert.assertEquals(FieldType.DATE, broTypes.get("timestamp"));
       Assert.assertEquals(FieldType.FLOAT, broTypes.get("latitude"));
-      Assert.assertEquals(FieldType.DOUBLE, broTypes.get("double_field"));
+      Assert.assertEquals(FieldType.DOUBLE, broTypes.get("score"));
       Assert.assertEquals(FieldType.BOOLEAN, broTypes.get("is_alert"));
       Assert.assertEquals(FieldType.OTHER, broTypes.get("location_point"));
       Assert.assertEquals(FieldType.STRING, broTypes.get("bro_field"));
@@ -453,7 +517,7 @@ public abstract class SearchIntegrationTest {
       Assert.assertEquals(FieldType.LONG, snortTypes.get("long_field"));
       Assert.assertEquals(FieldType.DATE, snortTypes.get("timestamp"));
       Assert.assertEquals(FieldType.FLOAT, snortTypes.get("latitude"));
-      Assert.assertEquals(FieldType.DOUBLE, snortTypes.get("double_field"));
+      Assert.assertEquals(FieldType.DOUBLE, snortTypes.get("score"));
       Assert.assertEquals(FieldType.BOOLEAN, snortTypes.get("is_alert"));
       Assert.assertEquals(FieldType.OTHER, snortTypes.get("location_point"));
       Assert.assertEquals(FieldType.INTEGER, snortTypes.get("snort_field"));
@@ -486,7 +550,7 @@ public abstract class SearchIntegrationTest {
       Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
       Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
       Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
-      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("double_field"));
+      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
       Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
       Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
     }
@@ -527,6 +591,145 @@ public abstract class SearchIntegrationTest {
       SearchResponse response = dao.search(request);
       Assert.assertEquals(0, response.getTotal());
     }
+    // Group by test case, default order is count descending
+    {
+      GroupRequest request = JSONUtils.INSTANCE.load(groupByQuery, GroupRequest.class);
+      GroupResponse response = dao.group(request);
+      Assert.assertEquals("is_alert", response.getGroupedBy());
+      List<GroupResult> isAlertGroups = response.getGroupResults();
+      Assert.assertEquals(2, isAlertGroups.size());
+
+      // isAlert == true group
+      GroupResult trueGroup = isAlertGroups.get(0);
+      Assert.assertEquals("true", trueGroup.getKey());
+      Assert.assertEquals(6, trueGroup.getTotal());
+      Assert.assertEquals("latitude", trueGroup.getGroupedBy());
+      Assert.assertEquals(198.0, trueGroup.getScore(), 0.00001);
+      List<GroupResult> trueLatitudeGroups = trueGroup.getGroupResults();
+      Assert.assertEquals(2, trueLatitudeGroups.size());
+
+
+      // isAlert == true && latitude == 48.5839 group
+      GroupResult trueLatitudeGroup2 = trueLatitudeGroups.get(0);
+      Assert.assertEquals(48.5839, Double.parseDouble(trueLatitudeGroup2.getKey()), 0.00001);
+      Assert.assertEquals(5, trueLatitudeGroup2.getTotal());
+      Assert.assertEquals(148.0, trueLatitudeGroup2.getScore(), 0.00001);
+
+      // isAlert == true && latitude == 48.0001 group
+      GroupResult trueLatitudeGroup1 = trueLatitudeGroups.get(1);
+      Assert.assertEquals(48.0001, Double.parseDouble(trueLatitudeGroup1.getKey()), 0.00001);
+      Assert.assertEquals(1, trueLatitudeGroup1.getTotal());
+      Assert.assertEquals(50.0, trueLatitudeGroup1.getScore(), 0.00001);
+
+      // isAlert == false group
+      GroupResult falseGroup = isAlertGroups.get(1);
+      Assert.assertEquals("false", falseGroup.getKey());
+      Assert.assertEquals("latitude", falseGroup.getGroupedBy());
+      Assert.assertEquals(130.0, falseGroup.getScore(), 0.00001);
+      List<GroupResult> falseLatitudeGroups = falseGroup.getGroupResults();
+      Assert.assertEquals(2, falseLatitudeGroups.size());
+
+      // isAlert == false && latitude == 48.5839 group
+      GroupResult falseLatitudeGroup2 = falseLatitudeGroups.get(0);
+      Assert.assertEquals(48.5839, Double.parseDouble(falseLatitudeGroup2.getKey()), 0.00001);
+      Assert.assertEquals(3, falseLatitudeGroup2.getTotal());
+      Assert.assertEquals(80.0, falseLatitudeGroup2.getScore(), 0.00001);
+
+      // isAlert == false && latitude == 48.0001 group
+      GroupResult falseLatitudeGroup1 = falseLatitudeGroups.get(1);
+      Assert.assertEquals(48.0001, Double.parseDouble(falseLatitudeGroup1.getKey()), 0.00001);
+      Assert.assertEquals(1, falseLatitudeGroup1.getTotal());
+      Assert.assertEquals(50.0, falseLatitudeGroup1.getScore(), 0.00001);
+    }
+    // Group by with sorting test case where is_alert is sorted by count ascending and ip_src_addr is sorted by term descending
+    {
+      GroupRequest request = JSONUtils.INSTANCE.load(sortedGroupByQuery, GroupRequest.class);
+      GroupResponse response = dao.group(request);
+      Assert.assertEquals("is_alert", response.getGroupedBy());
+      List<GroupResult> isAlertGroups = response.getGroupResults();
+      Assert.assertEquals(2, isAlertGroups.size());
+
+      // isAlert == false group
+      GroupResult falseGroup = isAlertGroups.get(0);
+      Assert.assertEquals(4, falseGroup.getTotal());
+      Assert.assertEquals("ip_src_addr", falseGroup.getGroupedBy());
+      List<GroupResult> falseIpSrcAddrGroups = falseGroup.getGroupResults();
+      Assert.assertEquals(4, falseIpSrcAddrGroups.size());
+
+      // isAlert == false && ip_src_addr == 192.168.1.8 group
+      GroupResult falseIpSrcAddrGroup1 = falseIpSrcAddrGroups.get(0);
+      Assert.assertEquals("192.168.1.8", falseIpSrcAddrGroup1.getKey());
+      Assert.assertEquals(1, falseIpSrcAddrGroup1.getTotal());
+      Assert.assertNull(falseIpSrcAddrGroup1.getGroupedBy());
+      Assert.assertNull(falseIpSrcAddrGroup1.getGroupResults());
+
+      // isAlert == false && ip_src_addr == 192.168.1.7 group
+      GroupResult falseIpSrcAddrGroup2 = falseIpSrcAddrGroups.get(1);
+      Assert.assertEquals("192.168.1.7", falseIpSrcAddrGroup2.getKey());
+      Assert.assertEquals(1, falseIpSrcAddrGroup2.getTotal());
+      Assert.assertNull(falseIpSrcAddrGroup2.getGroupedBy());
+      Assert.assertNull(falseIpSrcAddrGroup2.getGroupResults());
+
+      // isAlert == false && ip_src_addr == 192.168.1.6 group
+      GroupResult falseIpSrcAddrGroup3 = falseIpSrcAddrGroups.get(2);
+      Assert.assertEquals("192.168.1.6", falseIpSrcAddrGroup3.getKey());
+      Assert.assertEquals(1, falseIpSrcAddrGroup3.getTotal());
+      Assert.assertNull(falseIpSrcAddrGroup3.getGroupedBy());
+      Assert.assertNull(falseIpSrcAddrGroup3.getGroupResults());
+
+      // isAlert == false && ip_src_addr == 192.168.1.2 group
+      GroupResult falseIpSrcAddrGroup4 = falseIpSrcAddrGroups.get(3);
+      Assert.assertEquals("192.168.1.2", falseIpSrcAddrGroup4.getKey());
+      Assert.assertEquals(1, falseIpSrcAddrGroup4.getTotal());
+      Assert.assertNull(falseIpSrcAddrGroup4.getGroupedBy());
+      Assert.assertNull(falseIpSrcAddrGroup4.getGroupResults());
+
+      // isAlert == false group
+      GroupResult trueGroup = isAlertGroups.get(1);
+      Assert.assertEquals(6, trueGroup.getTotal());
+      Assert.assertEquals("ip_src_addr", trueGroup.getGroupedBy());
+      List<GroupResult> trueIpSrcAddrGroups = trueGroup.getGroupResults();
+      Assert.assertEquals(4, trueIpSrcAddrGroups.size());
+
+      // isAlert == false && ip_src_addr == 192.168.1.5 group
+      GroupResult trueIpSrcAddrGroup1 = trueIpSrcAddrGroups.get(0);
+      Assert.assertEquals("192.168.1.5", trueIpSrcAddrGroup1.getKey());
+      Assert.assertEquals(1, trueIpSrcAddrGroup1.getTotal());
+      Assert.assertNull(trueIpSrcAddrGroup1.getGroupedBy());
+      Assert.assertNull(trueIpSrcAddrGroup1.getGroupResults());
+
+      // isAlert == false && ip_src_addr == 192.168.1.4 group
+      GroupResult trueIpSrcAddrGroup2 = trueIpSrcAddrGroups.get(1);
+      Assert.assertEquals("192.168.1.4", trueIpSrcAddrGroup2.getKey());
+      Assert.assertEquals(1, trueIpSrcAddrGroup2.getTotal());
+      Assert.assertNull(trueIpSrcAddrGroup2.getGroupedBy());
+      Assert.assertNull(trueIpSrcAddrGroup2.getGroupResults());
+
+      // isAlert == false && ip_src_addr == 192.168.1.3 group
+      GroupResult trueIpSrcAddrGroup3 = trueIpSrcAddrGroups.get(2);
+      Assert.assertEquals("192.168.1.3", trueIpSrcAddrGroup3.getKey());
+      Assert.assertEquals(1, trueIpSrcAddrGroup3.getTotal());
+      Assert.assertNull(trueIpSrcAddrGroup3.getGroupedBy());
+      Assert.assertNull(trueIpSrcAddrGroup3.getGroupResults());
+
+      // isAlert == false && ip_src_addr == 192.168.1.1 group
+      GroupResult trueIpSrcAddrGroup4 = trueIpSrcAddrGroups.get(3);
+      Assert.assertEquals("192.168.1.1", trueIpSrcAddrGroup4.getKey());
+      Assert.assertEquals(3, trueIpSrcAddrGroup4.getTotal());
+      Assert.assertNull(trueIpSrcAddrGroup4.getGroupedBy());
+      Assert.assertNull(trueIpSrcAddrGroup4.getGroupResults());
+    }
+    //Bad group query
+    {
+      GroupRequest request = JSONUtils.INSTANCE.load(badGroupQuery, GroupRequest.class);
+      try {
+        dao.group(request);
+        Assert.fail("Exception expected, but did not come.");
+      }
+      catch(InvalidSearchException ise) {
+        Assert.assertEquals("Could not execute search", ise.getMessage());
+      }
+    }
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-management/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/pom.xml b/metron-platform/metron-management/pom.xml
index 4117d69..a5cae38 100644
--- a/metron-platform/metron-management/pom.xml
+++ b/metron-platform/metron-management/pom.xml
@@ -205,6 +205,16 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.google.common</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
index 7574418..bf3342c 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
@@ -82,7 +82,15 @@ public class ConfigurationFunctionsTest {
       "parserConfig" : { },
       "fieldTransformations" : [ ],
       "readMetadata":false,
-      "mergeMetadata":false
+      "mergeMetadata":false,
+      "parserParallelism" : 1,
+      "errorWriterParallelism" : 1,
+      "spoutNumTasks" : 1,
+      "stormConfig" : {},
+      "errorWriterNumTasks":1,
+      "spoutConfig":{},
+      "parserNumTasks":1,
+      "spoutParallelism":1
     }
    */
   @Multiline

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index ea4f1dd..d4a984d 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -103,6 +103,17 @@ then it is assumed to be a regex and will match any topic matching the pattern (
 * `mergeMetadata` : Boolean indicating whether to merge metadata with the message or not (`false` by default).  See below for a discussion about metadata.
 * `parserConfig` : A JSON Map representing the parser implementation specific configuration.
 * `fieldTransformations` : An array of complex objects representing the transformations to be done on the message generated from the parser before writing out to the kafka topic.
+* `spoutParallelism` : The kafka spout parallelism (default to `1`).  This can be overridden on the command line.
+* `spoutNumTasks` : The number of tasks for the spout (default to `1`). This can be overridden on the command line.
+* `parserParallelism` : The parser bolt parallelism (default to `1`). This can be overridden on the command line.
+* `parserNumTasks` : The number of tasks for the parser bolt (default to `1`). This can be overridden on the command line.
+* `errorWriterParallelism` : The error writer bolt parallelism (default to `1`). This can be overridden on the command line.
+* `errorWriterNumTasks` : The number of tasks for the error writer bolt (default to `1`). This can be overridden on the command line.
+* `numWorkers` : The number of workers to use in the topology (default is the storm default of `1`).
+* `numAckers` : The number of acker executors to use in the topology (default is the storm default of `1`).
+* `spoutConfig` : A map representing a custom spout config (this is a map). This can be overridden on the command line.
+* `securityProtocol` : The security protocol to use for reading from kafka (this is a string).  This can be overridden on the command line and also specified in the spout config via the `security.protocol` key.  If both are specified, then they are merged and the CLI will take precedence.
+* `stormConfig` : The storm config to use (this is a map).  This can be overridden on the command line.  If both are specified, they are merged with CLI properties taking precedence.
 
 The `fieldTransformations` is a complex object which defines a
 transformation which can be done to a message.  This transformation can 

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/pom.xml b/metron-platform/metron-parsers/pom.xml
index b3b5657..b80343a 100644
--- a/metron-platform/metron-parsers/pom.xml
+++ b/metron-platform/metron-parsers/pom.xml
@@ -276,6 +276,16 @@
                         <configuration>
                             <shadedArtifactAttached>true</shadedArtifactAttached>
                             <shadedClassifierName>uber</shadedClassifierName>
+                            <filters>
+                              <filter>
+                                <artifact>*:*</artifact>
+                                <excludes>
+                                  <exclude>META-INF/*.SF</exclude>
+                                  <exclude>META-INF/*.DSA</exclude>
+                                  <exclude>META-INF/*.RSA</exclude>
+                                </excludes>
+                              </filter>
+                            </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.fasterxml.jackson</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 53d3d99..6c0dc28 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -18,9 +18,11 @@
 package org.apache.metron.parsers.topology;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.metron.storm.kafka.flux.StormKafkaSpout;
+import org.apache.storm.Config;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.topology.TopologyBuilder;
@@ -47,34 +49,57 @@ import java.util.*;
  */
 public class ParserTopologyBuilder {
 
+  public static class ParserTopology {
+    private TopologyBuilder builder;
+    private Config topologyConfig;
+
+    private ParserTopology(TopologyBuilder builder, Config topologyConfig) {
+      this.builder = builder;
+      this.topologyConfig = topologyConfig;
+    }
+
+
+    public TopologyBuilder getBuilder() {
+      return builder;
+    }
+
+    public Config getTopologyConfig() {
+      return topologyConfig;
+    }
+  }
+
   /**
    * Builds a Storm topology that parses telemetry data received from an external sensor.
    *
    * @param zookeeperUrl             Zookeeper URL
    * @param brokerUrl                Kafka Broker URL
    * @param sensorType               Type of sensor
-   * @param spoutParallelism         Parallelism hint for the spout
-   * @param spoutNumTasks            Number of tasks for the spout
-   * @param parserParallelism        Parallelism hint for the parser bolt
-   * @param parserNumTasks           Number of tasks for the parser bolt
-   * @param errorWriterParallelism   Parallelism hint for the bolt that handles errors
-   * @param errorWriterNumTasks      Number of tasks for the bolt that handles errors
-   * @param kafkaSpoutConfig         Configuration options for the kafka spout
+   * @param spoutParallelismSupplier         Supplier for the parallelism hint for the spout
+   * @param spoutNumTasksSupplier            Supplier for the number of tasks for the spout
+   * @param parserParallelismSupplier        Supplier for the parallelism hint for the parser bolt
+   * @param parserNumTasksSupplier           Supplier for the number of tasks for the parser bolt
+   * @param errorWriterParallelismSupplier   Supplier for the parallelism hint for the bolt that handles errors
+   * @param errorWriterNumTasksSupplier      Supplier for the number of tasks for the bolt that handles errors
+   * @param kafkaSpoutConfigSupplier         Supplier for the configuration options for the kafka spout
+   * @param securityProtocolSupplier         Supplier for the security protocol
+   * @param outputTopic                      The output kafka topic
+   * @param stormConfigSupplier              Supplier for the storm config
    * @return A Storm topology that parses telemetry data received from an external sensor
    * @throws Exception
    */
-  public static TopologyBuilder build(String zookeeperUrl,
+  public static ParserTopology build(String zookeeperUrl,
                                       Optional<String> brokerUrl,
                                       String sensorType,
-                                      int spoutParallelism,
-                                      int spoutNumTasks,
-                                      int parserParallelism,
-                                      int parserNumTasks,
-                                      int errorWriterParallelism,
-                                      int errorWriterNumTasks,
-                                      Map<String, Object> kafkaSpoutConfig,
-                                      Optional<String> securityProtocol,
-                                      Optional<String> outputTopic
+                                      ValueSupplier<Integer> spoutParallelismSupplier,
+                                      ValueSupplier<Integer> spoutNumTasksSupplier,
+                                      ValueSupplier<Integer> parserParallelismSupplier,
+                                      ValueSupplier<Integer> parserNumTasksSupplier,
+                                      ValueSupplier<Integer> errorWriterParallelismSupplier,
+                                      ValueSupplier<Integer> errorWriterNumTasksSupplier,
+                                      ValueSupplier<Map> kafkaSpoutConfigSupplier,
+                                      ValueSupplier<String> securityProtocolSupplier,
+                                      Optional<String> outputTopic,
+                                      ValueSupplier<Config> stormConfigSupplier
   ) throws Exception {
 
 
@@ -82,6 +107,14 @@ public class ParserTopologyBuilder {
     // fetch configuration from zookeeper
     ParserConfigurations configs = new ParserConfigurations();
     SensorParserConfig parserConfig = getSensorParserConfig(zookeeperUrl, sensorType, configs);
+    int spoutParallelism = spoutParallelismSupplier.get(parserConfig, Integer.class);
+    int spoutNumTasks = spoutNumTasksSupplier.get(parserConfig, Integer.class);
+    int parserParallelism = parserParallelismSupplier.get(parserConfig, Integer.class);
+    int parserNumTasks = parserNumTasksSupplier.get(parserConfig, Integer.class);
+    int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfig, Integer.class);
+    int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfig, Integer.class);
+    Map<String, Object> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfig, Map.class);
+    Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfig, String.class));
 
     // create the spout
     TopologyBuilder builder = new TopologyBuilder();
@@ -103,7 +136,7 @@ public class ParserTopologyBuilder {
               .shuffleGrouping("parserBolt", Constants.ERROR_STREAM);
     }
 
-    return builder;
+    return new ParserTopology(builder, stormConfigSupplier.get(parserConfig, Config.class));
   }
 
   /**
@@ -243,16 +276,16 @@ public class ParserTopologyBuilder {
    * @throws Exception
    */
   private static SensorParserConfig getSensorParserConfig(String zookeeperUrl, String sensorType, ParserConfigurations configs) throws Exception {
-    CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
-    client.start();
-    ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client);
-    SensorParserConfig parserConfig = configs.getSensorParserConfig(sensorType);
-    if (parserConfig == null) {
-      throw new IllegalStateException("Cannot find the parser configuration in zookeeper for " + sensorType + "." +
-              "  Please check that it exists in zookeeper by using the 'zk_load_configs.sh -m DUMP' command.");
+    try(CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl)) {
+      client.start();
+      ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client);
+      SensorParserConfig parserConfig = configs.getSensorParserConfig(sensorType);
+      if (parserConfig == null) {
+        throw new IllegalStateException("Cannot find the parser configuration in zookeeper for " + sensorType + "." +
+                "  Please check that it exists in zookeeper by using the 'zk_load_configs.sh -m DUMP' command.");
+      }
+      return parserConfig;
     }
-    client.close();
-    return parserConfig;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 8ff4f93..b5ee628 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -18,10 +18,14 @@
 package org.apache.metron.parsers.topology;
 
 import org.apache.metron.common.Constants;
+import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -235,12 +239,19 @@ public class ParserTopologyCLI {
       return has(cli)?cli.getOptionValue(shortCode):def;
     }
 
-    public static Config getConfig(CommandLine cli) {
-      Config config = new Config();
+    public static Optional<Config> getConfig(CommandLine cli) {
+      return getConfig(cli, new Config());
+    }
+
+    public static Optional<Config> getConfig(CommandLine cli, Config config) {
+      if(EXTRA_OPTIONS.has(cli)) {
+        Map<String, Object> extraOptions = readJSONMapFromFile(new File(EXTRA_OPTIONS.get(cli)));
+        config.putAll(extraOptions);
+      }
       for(ParserOptions option : ParserOptions.values()) {
         config = option.configHandler.apply(new Arg(config, option.get(cli)));
       }
-      return config;
+      return config.isEmpty()?Optional.empty():Optional.of(config);
     }
 
     public static CommandLine parse(CommandLineParser parser, String[] args) throws ParseException {
@@ -273,65 +284,172 @@ public class ParserTopologyCLI {
     }
   }
 
+  private static CommandLine parse(Options options, String[] args) {
+    CommandLineParser parser = new PosixParser();
+    try {
+      return ParserOptions.parse(parser, args);
+    } catch (ParseException pe) {
+      pe.printStackTrace();
+      final HelpFormatter usageFormatter = new HelpFormatter();
+      usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true);
+      System.exit(-1);
+      return null;
+    }
+  }
+
+  public ParserTopologyBuilder.ParserTopology createParserTopology(final CommandLine cmd) throws Exception {
+    String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);
+    Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty();
+    String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
+
+    /*
+    It bears mentioning why we're creating this ValueSupplier indirection here.
+    As a separation of responsibilities, the CLI class defines the order of precedence
+    for the various topological and structural properties for creating a parser.  This is
+    desirable because there are now (i.e. integration tests)
+    and may be in the future (i.e. a REST service to start parsers without using the CLI)
+    other mechanisms to construct parser topologies.  It's sensible to split those concerns..
+
+    Unfortunately, determining the structural parameters for a parser requires interacting with
+    external services (e.g. zookeeper) that are set up well within the ParserTopology class.
+    Rather than pulling the infrastructure to interact with those services out and moving it into the
+    CLI class and breaking that separation of concerns, we've created a supplier
+    indirection where are providing the logic as to how to create precedence in the CLI class
+    without owning the responsibility of constructing the infrastructure where the values are
+    necessarily supplied.
+
+     */
+    ValueSupplier<Integer> spoutParallelism = (parserConfig, clazz) -> {
+      if(ParserOptions.SPOUT_PARALLELISM.has(cmd)) {
+        return Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
+      }
+      return Optional.ofNullable(parserConfig.getSpoutParallelism()).orElse(1);
+    };
+    ValueSupplier<Integer> spoutNumTasks = (parserConfig, clazz) -> {
+      if(ParserOptions.SPOUT_NUM_TASKS.has(cmd)) {
+        return Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
+      }
+      return Optional.ofNullable(parserConfig.getSpoutNumTasks()).orElse(1);
+    };
+    ValueSupplier<Integer> parserParallelism = (parserConfig, clazz) -> {
+      if(ParserOptions.PARSER_PARALLELISM.has(cmd)) {
+        return Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1"));
+      }
+      return Optional.ofNullable(parserConfig.getParserParallelism()).orElse(1);
+    };
+
+    ValueSupplier<Integer> parserNumTasks = (parserConfig, clazz) -> {
+      if(ParserOptions.PARSER_NUM_TASKS.has(cmd)) {
+        return Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
+      }
+      return Optional.ofNullable(parserConfig.getParserNumTasks()).orElse(1);
+    };
+
+    ValueSupplier<Integer> errorParallelism = (parserConfig, clazz) -> {
+      if(ParserOptions.ERROR_WRITER_PARALLELISM.has(cmd)) {
+        return Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
+      }
+      return Optional.ofNullable(parserConfig.getErrorWriterParallelism()).orElse(1);
+    };
+
+    ValueSupplier<Integer> errorNumTasks = (parserConfig, clazz) -> {
+      if(ParserOptions.ERROR_WRITER_NUM_TASKS.has(cmd)) {
+        return Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
+      }
+      return Optional.ofNullable(parserConfig.getErrorWriterNumTasks()).orElse(1);
+    };
+
+    ValueSupplier<Map> spoutConfig = (parserConfig, clazz) -> {
+      if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
+        return readJSONMapFromFile(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
+      }
+      return Optional.ofNullable(parserConfig.getSpoutConfig()).orElse(new HashMap<>());
+    };
+
+    ValueSupplier<String> securityProtocol = (parserConfig, clazz) -> {
+      Optional<String> sp = Optional.empty();
+      if (ParserOptions.SECURITY_PROTOCOL.has(cmd)) {
+        sp = Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd));
+      }
+      if (!sp.isPresent()) {
+        sp = getSecurityProtocol(sp, spoutConfig.get(parserConfig, Map.class));
+      }
+      return sp.orElse(Optional.ofNullable(parserConfig.getSecurityProtocol()).orElse(null));
+    };
+
+    ValueSupplier<Config> stormConf = (parserConfig, clazz) -> {
+      Map<String, Object> c = parserConfig.getStormConfig();
+      Config finalConfig = new Config();
+      if(c != null && !c.isEmpty()) {
+        finalConfig.putAll(c);
+      }
+      if(parserConfig.getNumAckers() != null) {
+        Config.setNumAckers(finalConfig, parserConfig.getNumAckers());
+      }
+      if(parserConfig.getNumWorkers() != null) {
+        Config.setNumWorkers(finalConfig, parserConfig.getNumWorkers());
+      }
+      return ParserOptions.getConfig(cmd, finalConfig).orElse(finalConfig);
+    };
+
+    Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty();
+
+    return getParserTopology(zookeeperUrl, brokerUrl, sensorType, spoutParallelism, spoutNumTasks, parserParallelism, parserNumTasks, errorParallelism, errorNumTasks, spoutConfig, securityProtocol, stormConf, outputTopic);
+  }
+
+  protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl
+                                                                  , Optional<String> brokerUrl
+                                                                  , String sensorType
+                                                                  , ValueSupplier<Integer> spoutParallelism
+                                                                  , ValueSupplier<Integer> spoutNumTasks
+                                                                  , ValueSupplier<Integer> parserParallelism
+                                                                  , ValueSupplier<Integer> parserNumTasks
+                                                                  , ValueSupplier<Integer> errorParallelism
+                                                                  , ValueSupplier<Integer> errorNumTasks
+                                                                  , ValueSupplier<Map> spoutConfig
+                                                                  , ValueSupplier<String> securityProtocol
+                                                                  , ValueSupplier<Config> stormConf
+                                                                  , Optional<String> outputTopic
+                                                                  ) throws Exception
+  {
+    return ParserTopologyBuilder.build(zookeeperUrl,
+                brokerUrl,
+                sensorType,
+                spoutParallelism,
+                spoutNumTasks,
+                parserParallelism,
+                parserNumTasks,
+                errorParallelism,
+                errorNumTasks,
+                spoutConfig,
+                securityProtocol,
+                outputTopic,
+                stormConf
+        );
+  }
+
+
   public static void main(String[] args) {
-    Options options = new Options();
 
     try {
-      CommandLineParser parser = new PosixParser();
-      CommandLine cmd = null;
-      try {
-        cmd = ParserOptions.parse(parser, args);
-      } catch (ParseException pe) {
-        pe.printStackTrace();
-        final HelpFormatter usageFormatter = new HelpFormatter();
-        usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true);
-        System.exit(-1);
-      }
+      Options options = new Options();
+      final CommandLine cmd = parse(options, args);
       if (cmd.hasOption("h")) {
         final HelpFormatter usageFormatter = new HelpFormatter();
         usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true);
         System.exit(0);
       }
-      String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);;
-      Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty();
+      ParserTopologyCLI cli = new ParserTopologyCLI();
+      ParserTopologyBuilder.ParserTopology topology = cli.createParserTopology(cmd);
       String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
-      int spoutParallelism = Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
-      int spoutNumTasks = Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
-      int parserParallelism = Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1"));
-      int parserNumTasks= Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
-      int errorParallelism = Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
-      int errorNumTasks= Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
-      int invalidParallelism = Integer.parseInt(ParserOptions.INVALID_WRITER_PARALLELISM.get(cmd, "1"));
-      int invalidNumTasks= Integer.parseInt(ParserOptions.INVALID_WRITER_NUM_TASKS.get(cmd, "1"));
-      Map<String, Object> spoutConfig = new HashMap<>();
-      if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
-        spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
-      }
-      Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty();
-      Optional<String> securityProtocol = ParserOptions.SECURITY_PROTOCOL.has(cmd)?Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)):Optional.empty();
-      securityProtocol = getSecurityProtocol(securityProtocol, spoutConfig);
-      TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
-              brokerUrl,
-              sensorType,
-              spoutParallelism,
-              spoutNumTasks,
-              parserParallelism,
-              parserNumTasks,
-              errorParallelism,
-              errorNumTasks,
-              spoutConfig,
-              securityProtocol,
-              outputTopic
-      );
-      Config stormConf = ParserOptions.getConfig(cmd);
       if (ParserOptions.TEST.has(cmd)) {
-        stormConf.put(Config.TOPOLOGY_DEBUG, true);
+        topology.getTopologyConfig().put(Config.TOPOLOGY_DEBUG, true);
         LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(sensorType, stormConf, builder.createTopology());
+        cluster.submitTopology(sensorType, topology.getTopologyConfig(), topology.getBuilder().createTopology());
         Utils.sleep(300000);
         cluster.shutdown();
       } else {
-        StormSubmitter.submitTopology(sensorType, stormConf, builder.createTopology());
+        StormSubmitter.submitTopology(sensorType, topology.getTopologyConfig(), topology.getBuilder().createTopology());
       }
     } catch (Exception e) {
       e.printStackTrace();
@@ -347,13 +465,13 @@ public class ParserTopologyCLI {
     if(!ret.isPresent()) {
       ret = Optional.ofNullable((String) spoutConfig.get("security.protocol"));
     }
-    if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) {
+    if(ret.isPresent() && ret.get().equalsIgnoreCase("PLAINTEXT")) {
       ret = Optional.empty();
     }
     return ret;
   }
 
-  private static Map<String, Object> readSpoutConfig(File inputFile) {
+  private static Map<String, Object> readJSONMapFromFile(File inputFile) {
     String json = null;
     if (inputFile.exists()) {
       try {


Mime
View raw message