metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject incubator-metron git commit: METRON-392 Allow User to Define Custom 'Group By' for a Profile (nickwallen) closes apache/incubator-metron#230
Date Tue, 06 Sep 2016 22:23:29 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master 47bc9b6de -> 8b623b87f


METRON-392 Allow User to Define Custom 'Group By' for a Profile (nickwallen) closes apache/incubator-metron#230


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/8b623b87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/8b623b87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/8b623b87

Branch: refs/heads/master
Commit: 8b623b87f5754edfa0f1ef27b9b740af7b4f6f2e
Parents: 47bc9b6
Author: nickwallen <nick@nickallen.org>
Authored: Tue Sep 6 18:22:50 2016 -0400
Committer: Nick Allen <nick@nickallen.org>
Committed: Tue Sep 6 18:22:50 2016 -0400

----------------------------------------------------------------------
 metron-analytics/metron-profiler/README.md      |  91 ++++++-
 .../metron/profiler/ProfileMeasurement.java     |  21 +-
 .../profiler/bolt/ProfileBuilderBolt.java       |  21 +-
 .../profiler/bolt/ProfileHBaseMapper.java       | 110 +++++++--
 .../profiler/bolt/ProfileSplitterBolt.java      |  15 +-
 .../stellar/DefaultStellarExecutor.java         |  34 ++-
 .../profiler/stellar/StellarExecutor.java       |  15 +-
 .../profiler/bolt/ProfileHBaseMapperTest.java   | 239 +++++++++++++++++++
 .../util/DefaultStellarExecutorTest.java        |  27 ++-
 .../configuration/profiler/ProfileConfig.java   |  44 +++-
 .../org/apache/metron/common/dsl/Context.java   |  25 +-
 .../functions/StellarStatisticsFunctions.java   |   1 -
 12 files changed, 543 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8b623b87/metron-analytics/metron-profiler/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/README.md b/metron-analytics/metron-profiler/README.md
index f1ba045..fd02b18 100644
--- a/metron-analytics/metron-profiler/README.md
+++ b/metron-analytics/metron-profiler/README.md
@@ -12,14 +12,89 @@ Any field contained within a message can be used to generate a profile.
 A profi
 
 The Profiler configuration requires a JSON-formatted set of elements, many of which can contain
Stellar code.  The configuration contains the following elements.
 
-| Name 	    |            	| Description 	|
-|---	    |---	        |---	        |
-| profile  	| Required   	| A unique name identifying the profile.  The field is treated
as a string. |
-| foreach  	| Required  	| A separate profile is maintained *for each* of these.  This is
effectively the entity that the profile is describing.  The field is expected to contain a
Stellar expression whose result is the entity name.  For example, if `ip_src_addr` then a
separate profile would be maintained for each unique IP source address in the data; 10.0.0.1,
10.0.0.2, etc. | 
-| onlyif  	| Optional  	| An expression that determines if a message should be applied to
the profile.  A Stellar expression is expected that when executed returns a boolean.  A message
is only applied to a profile if this condition is true. This allows a profile to filter the
messages that it receives. |
-| init  	| Optional  	| A set of expressions that is executed at the start of a window period.
 A map is expected where the key is the variable name and the value is a Stellar expression.
 The map can contain 0 or more variables/expressions. At the start of each window period the
expression is executed once and stored in a variable with the given name. |
-| update  	| Required  	| A set of expressions that is executed when a message is applied
to the profile.  A map is expected where the key is the variable name and the value is a Stellar
expression.  The map can include 0 or more variables/expressions.  	    |
-| result  	| Required  	| A Stellar expression that is executed when the window period expires.
 The expression is expected to in some way summarize the messages that were applied to the
profile over the window period.  The expression must result in a numeric value such as a Double,
Long, Float, Short, or Integer.  	    |
+| Name 	                |               | Description 	
+|---	                |---	        |---
+| [profile](#profile)   | Required   	| Unique name identifying the profile. 
+| [foreach](#foreach)   | Required  	| A separate profile is maintained "for each" of these.

+| [onlyif](#onlyif)  	| Optional  	| Boolean expression that determines if a message should
be applied to the profile.
+| [groupBy](#groupby)   | Optional      | One or more Stellar expressions used to group the
profile measurements when persisted.
+| [init](#init)  	    | Optional  	| One or more expressions executed at the start of a window
period.
+| [update](#update)  	| Required  	| One or more expressions executed when a message is applied
to the profile.
+| [result](#result)   	| Required  	| A Stellar expression that is executed when the window
period expires.
+
+#### `profile` 
+
+*Required*
+
+A unique name identifying the profile.  The field is treated as a string. 
+
+#### `foreach`
+
+*Required*
+
+A separate profile is maintained 'for each' of these.  This is effectively the entity that
the profile is describing.  The field is expected to contain a Stellar expression whose result
is the entity name.  
+
+For example, if `ip_src_addr` then a separate profile would be maintained for each unique
IP source address in the data; 10.0.0.1, 10.0.0.2, etc.
+
+#### `onlyif`
+
+*Optional*
+
+An expression that determines if a message should be applied to the profile.  A Stellar expression
that returns a Boolean is expected.  A message is only applied to a profile if this expression
is true. This allows a profile to filter the messages that get applied to it. 
+
+#### `groupBy`
+
+*Optional*
+
+One or more Stellar expressions used to group the profile measurements when persisted. This
is intended to sort the Profile data to allow for a contiguous scan when accessing subsets
of the data. 
+
+The 'groupBy' expressions can refer to any field within a `org.apache.metron.profiler.ProfileMeasurement`.
 This includes the following fields: 
+  * `profileName`: The name of the profile.
+  * `entity`: The name of the entity being profiled.
+  * `start`: The window start time in milliseconds from the epoch.
+  * `end`: The window end time in milliseconds from the epoch.
+  * `value`: The value calculated over the window period.
+  * `groupBy`: The set of 'groupBy' expressions; not the result of those expressions.
+
+A common use case would be grouping by day of week.  This allows a contiguous scan to access
all profile data for Mondays only.  Using the following definition would achieve this. 
+
+```
+"groupBy": [ "DAY_OF_WEEK(start)" ] 
+```
+
+*NOTE*: A series of date functions will be added to Stellar in a follow-on PR to enhance
the types of groups that can be created.
+
+#### `init`
+
+*Optional*
+
+One or more expressions executed at the start of a window period.  A map is expected where
the key is the variable name and the value is a Stellar expression.  The map can contain 0
or more variables/expressions. At the start of each window period the expression is executed
once and stored in a variable with the given name. 
+
+```
+"init": {
+  "var1": "0",
+  "var2": "1"
+}
+```
+
+#### `update`
+
+*Required*
+
+One or more expressions executed when a message is applied to the profile.  A map is expected
where the key is the variable name and the value is a Stellar expression.  The map can include
0 or more variables/expressions. When each message is applied to the profile, the expression
is executed and stored in a variable with the given name.
+ 
+```
+"update": {
+  "var1": "var1 + 1",
+  "var2": "var2 + 1"
+}
+``` 
+
+#### `result`
+
+*Required*
+
+A Stellar expression that is executed when the window period expires.  The expression is
expected to summarize the messages that were applied to the profile over the window period.
 The expression must result in a numeric value such as a Double, Long, Float, Short, or Integer.
 	   
 
 ### Examples
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8b623b87/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index 6db76fe..351c5d4 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -20,6 +20,8 @@
 
 package org.apache.metron.profiler;
 
+import java.util.List;
+
 /**
  * Represents a single data point within a Profile.
  *
@@ -53,6 +55,11 @@ public class ProfileMeasurement {
    */
   private Object value;
 
+  /**
+   * A set of expressions used to group the profile measurements when persisted.
+   */
+  private List<String> groupBy;
+
   public String getProfileName() {
     return profileName;
   }
@@ -93,6 +100,14 @@ public class ProfileMeasurement {
     this.value = value;
   }
 
+  public List<String> getGroupBy() {
+    return groupBy;
+  }
+
+  public void setGroupBy(List<String> groupBy) {
+    this.groupBy = groupBy;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
@@ -104,8 +119,8 @@ public class ProfileMeasurement {
     if (end != that.end) return false;
     if (profileName != null ? !profileName.equals(that.profileName) : that.profileName !=
null) return false;
     if (entity != null ? !entity.equals(that.entity) : that.entity != null) return false;
-    return value != null ? value.equals(that.value) : that.value == null;
-
+    if (value != null ? !value.equals(that.value) : that.value != null) return false;
+    return groupBy != null ? groupBy.equals(that.groupBy) : that.groupBy == null;
   }
 
   @Override
@@ -115,6 +130,7 @@ public class ProfileMeasurement {
     result = 31 * result + (int) (start ^ (start >>> 32));
     result = 31 * result + (int) (end ^ (end >>> 32));
     result = 31 * result + (value != null ? value.hashCode() : 0);
+    result = 31 * result + (groupBy != null ? groupBy.hashCode() : 0);
     return result;
   }
 
@@ -126,6 +142,7 @@ public class ProfileMeasurement {
             ", start=" + start +
             ", end=" + end +
             ", value=" + value +
+            ", groupBy=" + groupBy +
             '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8b623b87/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 698e5d0..40cbb0e 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -40,7 +40,6 @@ import org.json.simple.parser.JSONParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Map;
 
 import static java.lang.String.format;
@@ -85,11 +84,6 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
    */
   private transient JSONParser parser;
 
-  /**
-   * Stellar context
-   */
-  private Context stellarContext;
-
   private OutputCollector collector;
 
   /**
@@ -111,10 +105,11 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   }
 
   protected void initializeStellar() {
-    stellarContext = new Context.Builder()
-                         .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
-                         .build();
-    StellarFunctions.initialize(stellarContext);
+    Context context = new Context.Builder()
+            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+            .build();
+    StellarFunctions.initialize(context);
+    executor.setContext(context);
   }
 
   @Override
@@ -196,7 +191,7 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     try {
       JSONObject message = (JSONObject) input.getValueByField("message");
       Map<String, String> expressions = profileConfig.getInit();
-      expressions.forEach((var, expr) -> executor.assign(var, expr, message, stellarContext));
+      expressions.forEach((var, expr) -> executor.assign(var, expr, message));
 
     } catch(ParseException e) {
       String msg = format("Bad 'init' expression: %s, profile=%s, entity=%s, start=%d",
@@ -215,7 +210,7 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     // execute each of the 'update' expressions
     try {
       Map<String, String> expressions = profileConfig.getUpdate();
-      expressions.forEach((var, expr) -> executor.assign(var, expr, message, stellarContext));
+      expressions.forEach((var, expr) -> executor.assign(var, expr, message));
 
     } catch(ParseException e) {
       String msg = format("Bad 'update' expression: %s, profile=%s, entity=%s, start=%d",
@@ -239,7 +234,7 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     Object result;
     try {
       String resultExpr = profileConfig.getResult();
-      result = executor.execute(resultExpr, new JSONObject(), Object.class, stellarContext);
+      result = executor.execute(resultExpr, new JSONObject(), Object.class);
     
     } catch(ParseException e) {
       throw new ParseException("Bad 'result' expression", e);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8b623b87/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
index 492437e..f76b943 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
@@ -21,14 +21,19 @@
 package org.apache.metron.profiler.bolt;
 
 import backtype.storm.tuple.Tuple;
+import org.apache.commons.beanutils.BeanMap;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.dsl.ParseException;
 import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.stellar.StellarExecutor;
 import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
 import org.apache.storm.hbase.common.ColumnList;
 
+import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.util.Calendar;
+
+import static java.lang.String.format;
 
 /**
  * An HbaseMapper that defines how a ProfileMeasurement is persisted within an HBase table.
@@ -36,9 +41,20 @@ import java.util.Calendar;
 public class ProfileHBaseMapper implements HBaseMapper {
 
   /**
-   * A salt is prepended to the row key to help prevent hotspotting.  This constant is used
-   * to generate the salt.  Ideally, this constant should be roughly equal to the number
of
-   * nodes in the Hbase cluster.
+   * Executes Stellar code and maintains state across multiple invocations.
+   */
+  private StellarExecutor executor;
+
+  /**
+   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
+   * divisor is used to generate the salt.
+   *
+   * If the salt divisor is 0, a salt will not be used.  By default, the salt is set
+   * to 0 and is not used in the row key.
+   *
+   * If the salt divisor is not 0, the salt will be prepended to the row key to help
+   * prevent hot-spotting.  When used this constant should be roughly equal to the
+   * number of nodes in the Hbase cluster.
    */
   private int saltDivisor;
 
@@ -61,18 +77,67 @@ public class ProfileHBaseMapper implements HBaseMapper {
   public byte[] rowKey(Tuple tuple) {
     ProfileMeasurement m = (ProfileMeasurement) tuple.getValueByField("measurement");
 
-    // create a calendar to determine day-of-week, etc
-    Calendar calendar = Calendar.getInstance();
-    calendar.setTimeInMillis(m.getStart());
-
-    return Bytes.toBytes(getSalt(m.getStart()) +
-                    m.getProfileName() +
-                    calendar.get(Calendar.DAY_OF_WEEK) +
-                    calendar.get(Calendar.WEEK_OF_MONTH) +
-                    calendar.get(Calendar.MONTH) +
-                    calendar.get(Calendar.YEAR) +
-                    m.getEntity() +
-                    m.getStart());
+    // execute the 'groupBy' expressions to determine the 'groups' used in the row key
+    String groups = executeGroupBy(m);
+
+    // row key = profile + entity + [group1, ...] + timestamp
+    int length = m.getProfileName().length() + m.getEntity().length() + groups.length() +
Long.BYTES;
+
+    ByteBuffer buffer;
+    if(saltDivisor > 0) {
+      // the row key needs to be prepended with a salt
+      byte[] salt = getSalt(m.getStart(), saltDivisor);
+      buffer = ByteBuffer
+              .allocate(length + salt.length)
+              .put(salt);
+
+    } else {
+      // no salt is needed
+      buffer = ByteBuffer
+              .allocate(length);
+    }
+
+    // append the remainder of the fields
+    buffer.put(m.getProfileName().getBytes())
+            .put(m.getEntity().getBytes())
+            .put(groups.getBytes())
+            .putLong(m.getStart());
+
+    buffer.flip();
+    return buffer.array();
+  }
+
+  /**
+   * Executes each of the 'groupBy' expressions.  The results of each
+   * are then appended to one another and returned as a String.
+   * @param m
+   * @return
+   */
+  private String executeGroupBy(ProfileMeasurement m) {
+
+    if(m.getGroupBy() == null || m.getGroupBy().size() == 0) {
+      // no groupBy expressions define
+      return "";
+    }
+
+    // allows each 'groupBy' expression to refer to the fields of the ProfileMeasurement
+    BeanMap measureAsMap = new BeanMap(m);
+    StringBuilder builder = new StringBuilder();
+
+    try {
+      // execute each of the 'groupBy' - build a String out of the results
+      for (String expr : m.getGroupBy()) {
+        Object result = executor.execute(expr, measureAsMap, Object.class);
+        builder.append(result);
+      }
+
+    } catch(Throwable e) {
+      String msg = format("Bad 'groupBy' expression: %s, profile=%s, entity=%s, start=%d",
+              e.getMessage(), m.getProfileName(), m.getEntity(), m.getStart());
+      throw new ParseException(msg, e);
+    }
+
+    return builder.toString();
   }
 
   /**
@@ -131,11 +196,12 @@ public class ProfileHBaseMapper implements HBaseMapper {
    *
    * @param epoch The timestamp in epoch millis to use in generating the salt.
    */
-  private int getSalt(long epoch) {
+  public static byte[] getSalt(long epoch, int saltDivisor) {
     try {
       MessageDigest digest = MessageDigest.getInstance("MD5");
       byte[] hash = digest.digest(Bytes.toBytes(epoch));
-      return Bytes.toInt(hash) % saltDivisor;
+      int salt = Bytes.toInt(hash) % saltDivisor;
+      return Bytes.toBytes(salt);
 
     } catch(NoSuchAlgorithmException e) {
       throw new RuntimeException(e);
@@ -158,6 +224,14 @@ public class ProfileHBaseMapper implements HBaseMapper {
     this.saltDivisor = saltDivisor;
   }
 
+  public StellarExecutor getExecutor() {
+    return executor;
+  }
+
+  public void setExecutor(StellarExecutor executor) {
+    this.executor = executor;
+  }
+
   public static final byte[] QPROFILE = Bytes.toBytes("profile");
   public static final byte[] QENTITY = Bytes.toBytes("entity");
   public static final byte[] QSTART = Bytes.toBytes("start");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8b623b87/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index 6008dab..d389410 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -66,12 +66,6 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
   private StellarExecutor executor;
 
   /**
-   * Stellar context
-   */
-  private Context stellarContext;
-
-
-  /**
    * @param zookeeperUrl The Zookeeper URL that contains the configuration for this bolt.
    */
   public ProfileSplitterBolt(String zookeeperUrl) {
@@ -87,10 +81,11 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
   }
 
   protected void initializeStellar() {
-    stellarContext = new Context.Builder()
+    Context context = new Context.Builder()
                          .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
                          .build();
-    StellarFunctions.initialize(stellarContext);
+    StellarFunctions.initialize(context);
+    executor.setContext(context);
   }
 
   @Override
@@ -135,10 +130,10 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
 
     // is this message needed by this profile?
     String onlyIf = profile.getOnlyif();
-    if (StringUtils.isBlank(onlyIf) || executor.execute(onlyIf, message, Boolean.class, stellarContext))
{
+    if (StringUtils.isBlank(onlyIf) || executor.execute(onlyIf, message, Boolean.class))
{
 
       // what is the name of the entity in this message?
-      String entity = executor.execute(profile.getForeach(), message, String.class, stellarContext);
+      String entity = executor.execute(profile.getForeach(), message, String.class);
 
       // emit a message for the bolt responsible for building this profile
       collector.emit(input, new Values(entity, profile, message));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8b623b87/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
index 7eaf38a..ae786f2 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
@@ -27,7 +27,6 @@ import org.apache.metron.common.dsl.StellarFunctions;
 import org.apache.metron.common.dsl.VariableResolver;
 import org.apache.metron.common.stellar.StellarProcessor;
 import org.apache.metron.common.utils.ConversionUtils;
-import org.json.simple.JSONObject;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -43,14 +42,22 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable
{
    */
   private Map<String, Object> state;
 
+  /**
+   * Provides additional context for initializing certain Stellar functions.  For
+   * example, references to find Zookeeper or HBase.
+   */
+  private Context context;
+
   public DefaultStellarExecutor() {
     clearState();
+    context = Context.EMPTY_CONTEXT();
   }
 
   /**
    * @param initialState Initial state loaded into the execution environment.
    */
   public DefaultStellarExecutor(Map<String, Object> initialState) {
+    this();
     this.state = new HashMap<>(initialState);
   }
 
@@ -65,11 +72,10 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable
{
    * @param variable The variable name to assign to.
    * @param expression The expression to execute.
    * @param message The message that provides additional context for the expression.
-   * @param stellarContext The context which holds global state for Stellar functions
    */
   @Override
-  public void assign(String variable, String expression, JSONObject message, Context stellarContext)
{
-    Object result = execute(expression, message, stellarContext);
+  public void assign(String variable, String expression, Map<String, Object> message)
{
+    Object result = execute(expression, message);
     state.put(variable, result);
   }
 
@@ -80,11 +86,10 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable
{
    * @param message The message that is accessible when Stellar is executed.
    * @param clazz The expected class of the expression's result.
    * @param <T> The expected class of the expression's result.
-   * @param stellarContext The context which holds global state for Stellar functions
    */
   @Override
-  public <T> T execute(String expr, JSONObject message, Class<T> clazz, Context
stellarContext) {
-    Object resultObject = execute(expr, message, stellarContext);
+  public <T> T execute(String expr, Map<String, Object> message, Class<T>
clazz) {
+    Object resultObject = execute(expr, message);
 
     // perform type conversion, if necessary
     T result = ConversionUtils.convert(resultObject, clazz);
@@ -102,17 +107,26 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable
{
   }
 
   /**
+   * Sets the Context for the Stellar execution environment.  This provides global data used
+   * to initialize Stellar functions.
+   * @param context The Stellar context.
+   */
+  @Override
+  public void setContext(Context context) {
+    this.context = context;
+  }
+
+  /**
    * Execute a Stellar expression.
    *
    * @param expr The expression to execute.
    * @param msg The message that is accessible when Stellar is executed.
-   * @param stellarContext The context which holds global state for Stellar functions
    */
-  private Object execute(String expr, JSONObject msg, Context stellarContext) {
+  private Object execute(String expr, Map<String, Object> msg) {
     try {
       VariableResolver resolver = new MapVariableResolver(state, msg);
       StellarProcessor processor = new StellarProcessor();
-      return processor.parse(expr, resolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext);
+      return processor.parse(expr, resolver, StellarFunctions.FUNCTION_RESOLVER(), context);
 
     } catch (ParseException e) {
       throw new ParseException(String.format("Bad expression: expr=%s, msg=%s, state=%s",
expr, msg, state));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8b623b87/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
index 8d6660c..66e3ad1 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
@@ -40,20 +40,18 @@ public interface StellarExecutor {
    * @param variable The name of the variable to assign to.
    * @param expression The expression to execute.
    * @param message The message that provides additional context for the expression.
-   * @param stellarContext The context which holds global state for Stellar functions
    */
-  void assign(String variable, String expression, JSONObject message, Context stellarContext);
+  void assign(String variable, String expression, Map<String, Object> message);
 
   /**
    * Execute a Stellar expression and return the result.
    *
    * @param expression The expression to execute.
-   * @param message The message that is accessible when Stellar is executed.
+   * @param message A map of values, most often the JSON message itself, that is accessible
within Stellar.
    * @param clazz The expected class of the expression's result.
    * @param <T> The expected class of the expression's result.
-   * @param stellarContext The context which holds global state for Stellar functions
    */
-  <T> T execute(String expression, JSONObject message, Class<T> clazz, Context
stellarContext);
+  <T> T execute(String expression, Map<String, Object> message, Class<T>
clazz);
 
   /**
    * The current state of the Stellar execution environment.
@@ -64,4 +62,11 @@ public interface StellarExecutor {
    * Removes all state from the execution environment.
    */
   void clearState();
+
+  /**
+   * Sets the Context for the Stellar execution environment.  This provides global data used
+   * to initialize Stellar functions.
+   * @param context The Stellar context.
+   */
+  void setContext(Context context);
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8b623b87/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
new file mode 100644
index 0000000..e96f2f5
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
@@ -0,0 +1,239 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+/**
+ * Tests the ProfileHBaseMapper class.
+ */
+public class ProfileHBaseMapperTest {
+
+  Tuple tuple;
+  ProfileHBaseMapper mapper;
+  ProfileMeasurement measurement;
+  DefaultStellarExecutor executor;
+
+  @Before
+  public void setup() {
+    executor = new DefaultStellarExecutor();
+
+    mapper = new ProfileHBaseMapper();
+    mapper.setExecutor(executor);
+    mapper.setSaltDivisor(0);
+
+    measurement = new ProfileMeasurement();
+    measurement.setProfileName("profile");
+    measurement.setEntity("entity");
+    measurement.setValue(22);
+    measurement.setStart(20000);
+    measurement.setEnd(50000);
+
+    // the tuple will contain the original message
+    tuple = mock(Tuple.class);
+    when(tuple.getValueByField(eq("measurement"))).thenReturn(measurement);
+  }
+
+  /**
+   * There is a single group in the 'groupBy' expression that simply returns the string "group1".
+   */
+  @Test
+  public void testRowKeyWithOneGroupBy() throws Exception {
+    // setup
+    measurement.setGroupBy(Arrays.asList("'group1'"));
+
+    // the expected row key
+    ByteBuffer buffer = ByteBuffer
+            .allocate(100)
+            .put(measurement.getProfileName().getBytes())
+            .put(measurement.getEntity().getBytes())
+            .put("group1".getBytes())
+            .putLong(measurement.getStart());
+    buffer.flip();
+    final byte[] expected = new byte[buffer.limit()];
+    buffer.get(expected, 0, buffer.limit());
+
+    // validate
+    byte[] actual = mapper.rowKey(tuple);
+    Assert.assertTrue(Arrays.equals(expected, actual));
+  }
+
+  /**
+   * The user can define multiple 'groupBy' expressions.
+   */
+  @Test
+  public void testRowKeyWithTwoGroupBy() throws Exception {
+    // setup
+    measurement.setGroupBy(Arrays.asList("'group1'", "'group2'"));
+
+    // the expected row key
+    ByteBuffer buffer = ByteBuffer
+            .allocate(100)
+            .put(measurement.getProfileName().getBytes())
+            .put(measurement.getEntity().getBytes())
+            .put("group1".getBytes())
+            .put("group2".getBytes())
+            .putLong(measurement.getStart());
+    buffer.flip();
+    final byte[] expected = new byte[buffer.limit()];
+    buffer.get(expected, 0, buffer.limit());
+
+    // validate
+    byte[] actual = mapper.rowKey(tuple);
+    Assert.assertTrue(Arrays.equals(expected, actual));
+  }
+
+  /**
+   * A 'groupBy' expression can return any type; not just Strings.
+   */
+  @Test
+  public void testRowKeyWithOneIntegerGroupBy() throws Exception {
+    // setup
+    measurement.setGroupBy(Arrays.asList("200"));
+
+    // the expected row key
+    ByteBuffer buffer = ByteBuffer
+            .allocate(100)
+            .put(measurement.getProfileName().getBytes())
+            .put(measurement.getEntity().getBytes())
+            .put(Integer.valueOf(200).toString().getBytes())
+            .putLong(measurement.getStart());
+    buffer.flip();
+    final byte[] expected = new byte[buffer.limit()];
+    buffer.get(expected, 0, buffer.limit());
+
+    // validate
+    byte[] actual = mapper.rowKey(tuple);
+    Assert.assertTrue(Arrays.equals(expected, actual));
+  }
+
+  /**
+   * A user does not have to define a 'groupBy'.  It is an optional field.
+   */
+  @Test
+  public void testRowKeyWithNoGroupBy() throws Exception {
+    // setup
+    measurement.setGroupBy(Collections.emptyList());
+
+    // the expected row key
+    ByteBuffer buffer = ByteBuffer
+            .allocate(100)
+            .put(measurement.getProfileName().getBytes())
+            .put(measurement.getEntity().getBytes())
+            .putLong(measurement.getStart());
+    buffer.flip();
+    final byte[] expected = new byte[buffer.limit()];
+    buffer.get(expected, 0, buffer.limit());
+
+    // validate
+    byte[] actual = mapper.rowKey(tuple);
+    Assert.assertTrue(Arrays.equals(expected, actual));
+  }
+
+  /**
+   * A user does not have to define a 'groupBy'.  It is an optional field.
+   */
+  @Test
+  public void testRowKeyWithNullGroupBy() throws Exception {
+    // setup
+    measurement.setGroupBy(null);
+
+    // the expected row key
+    ByteBuffer buffer = ByteBuffer
+            .allocate(100)
+            .put(measurement.getProfileName().getBytes())
+            .put(measurement.getEntity().getBytes())
+            .putLong(measurement.getStart());
+    buffer.flip();
+    final byte[] expected = new byte[buffer.limit()];
+    buffer.get(expected, 0, buffer.limit());
+
+    // validate
+    byte[] actual = mapper.rowKey(tuple);
+    Assert.assertTrue(Arrays.equals(expected, actual));
+  }
+
+  /**
+   * A 'groupBy' expression can refer to the fields within the ProfileMeasurement.  The
+   * most important fields likely being the starting and ending timestamp.  With these fields
+   * any calendar based group can be defined.  For example, the day of week, week of month,
etc
+   * can all be calculated from these vaulues.
+   */
+  @Test
+  public void testRowKeyWithGroupByUsingMeasurementField() throws Exception {
+
+    // setup - the group expression refers to the 'end' timestamp contained within the ProfileMeasurement
+    measurement.setGroupBy(Arrays.asList("end"));
+
+    // the expected row key
+    ByteBuffer buffer = ByteBuffer
+            .allocate(100)
+            .put(measurement.getProfileName().getBytes())
+            .put(measurement.getEntity().getBytes())
+            .put(Long.valueOf(measurement.getEnd()).toString().getBytes())
+            .putLong(measurement.getStart());
+    buffer.flip();
+    final byte[] expected = new byte[buffer.limit()];
+    buffer.get(expected, 0, buffer.limit());
+
+    // validate
+    byte[] actual = mapper.rowKey(tuple);
+    Assert.assertTrue(Arrays.equals(expected, actual));
+  }
+
+  /**
+   * If the saltDivisor > 0, then the row key should be prepended with a salt.  This
+   * can be used to prevent hotspotting.
+   */
+  @Test
+  public void testRowKeyWithSalt() throws Exception {
+    // setup
+    mapper.setSaltDivisor(100);
+    measurement.setGroupBy(Collections.emptyList());
+
+    // the expected row key
+    ByteBuffer buffer = ByteBuffer
+            .allocate(100)
+            .put(ProfileHBaseMapper.getSalt(measurement.getStart(), mapper.getSaltDivisor()))
+            .put(measurement.getProfileName().getBytes())
+            .put(measurement.getEntity().getBytes())
+            .putLong(measurement.getStart());
+    buffer.flip();
+    final byte[] expected = new byte[buffer.limit()];
+    buffer.get(expected, 0, buffer.limit());
+
+    // validate
+    byte[] actual = mapper.rowKey(tuple);
+    Assert.assertTrue(Arrays.equals(expected, actual));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8b623b87/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
index 81df399..c11d94a 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
@@ -60,6 +60,7 @@ public class DefaultStellarExecutorTest {
 
     // create the executor to test
     executor = new DefaultStellarExecutor();
+    executor.setContext(Context.EMPTY_CONTEXT());
   }
 
   /**
@@ -67,7 +68,7 @@ public class DefaultStellarExecutorTest {
    */
   @Test
   public void testAssign() {
-    executor.assign("foo", "2", message, Context.EMPTY_CONTEXT());
+    executor.assign("foo", "2", message);
 
     // verify
     Object var = executor.getState().get("foo");
@@ -80,7 +81,7 @@ public class DefaultStellarExecutorTest {
    */
   @Test
   public void testAssignWithVariableResolution() {
-    executor.assign("foo", "ip_src_addr", message, Context.EMPTY_CONTEXT());
+    executor.assign("foo", "ip_src_addr", message);
 
     // verify
     Object var = executor.getState().get("foo");
@@ -93,9 +94,9 @@ public class DefaultStellarExecutorTest {
    */
   @Test
   public void testState() {
-    executor.assign("two", "2", message, Context.EMPTY_CONTEXT());
-    executor.assign("four", "4", message, Context.EMPTY_CONTEXT());
-    executor.assign("sum", "two + four", message, Context.EMPTY_CONTEXT());
+    executor.assign("two", "2", message);
+    executor.assign("four", "4", message);
+    executor.assign("sum", "two + four", message);
 
     // verify
     Object var = executor.getState().get("sum");
@@ -107,7 +108,7 @@ public class DefaultStellarExecutorTest {
    */
   @Test
   public void testClearState() {
-    executor.assign("two", "2", message, Context.EMPTY_CONTEXT());
+    executor.assign("two", "2", message);
     executor.clearState();
 
     // verify
@@ -123,7 +124,7 @@ public class DefaultStellarExecutorTest {
    */
   @Test
   public void testExecuteTransformation() {
-    String actual = executor.execute("TO_UPPER('lowercase')", message, String.class, Context.EMPTY_CONTEXT());
+    String actual = executor.execute("TO_UPPER('lowercase')", message, String.class);
     assertThat(actual, equalTo("LOWERCASE"));
   }
 
@@ -136,7 +137,7 @@ public class DefaultStellarExecutorTest {
    */
   @Test
   public void testExecutePredicate() {
-    boolean actual = executor.execute("IS_INTEGER(2)", message, Boolean.class, Context.EMPTY_CONTEXT());
+    boolean actual = executor.execute("IS_INTEGER(2)", message, Boolean.class);
     assertThat(actual, equalTo(true));
   }
 
@@ -145,7 +146,7 @@ public class DefaultStellarExecutorTest {
    */
   @Test(expected = RuntimeException.class)
   public void testExecuteWithWrongType() {
-    executor.execute("2 + 2", message, Boolean.class, Context.EMPTY_CONTEXT());
+    executor.execute("2 + 2", message, Boolean.class);
   }
 
   /**
@@ -153,9 +154,9 @@ public class DefaultStellarExecutorTest {
    */
   @Test
   public void testExecuteWithTypeConversion() {
-    executor.execute("2", message, Double.class, Context.EMPTY_CONTEXT());
-    executor.execute("2", message, Float.class, Context.EMPTY_CONTEXT());
-    executor.execute("2", message, Short.class, Context.EMPTY_CONTEXT());
-    executor.execute("2", message, Long.class, Context.EMPTY_CONTEXT());
+    executor.execute("2", message, Double.class);
+    executor.execute("2", message, Float.class);
+    executor.execute("2", message, Short.class);
+    executor.execute("2", message, Long.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8b623b87/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
index f832e1d..34dab13 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
@@ -18,7 +18,9 @@
 package org.apache.metron.common.configuration.profiler;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -27,36 +29,56 @@ import java.util.Map;
 public class ProfileConfig implements Serializable {
 
   /**
-   * The name of the profile.
+   * A unique name identifying the profile.  The field is treated as a string.
    */
   private String profile;
 
   /**
-   * Stella code that when executed results in the name of the entity being profiled.  A
-   * profile is created 'for-each' of these; hence the name.
+   * A separate profile is maintained for each of these.  This is effectively the
+   * entity that the profile is describing.  The field is expected to contain a
+   * Stellar expression whose result is the entity name.  For example, if `ip_src_addr`
+   * then a separate profile would be maintained for each unique IP source address in
+   * the data; 10.0.0.1, 10.0.0.2, etc.
    */
   private String foreach;
 
   /**
-   * Stella code that when executed determines whether a message should be included in this
-   * profile.
+   * An expression that determines if a message should be applied to the profile.  A
+   * Stellar expression is expected that when executed returns a boolean.  A message
+   * is only applied to a profile if this condition is true. This allows a profile
+   * to filter the messages that it receives.
    */
   private String onlyif;
 
   /**
-   * Stella code that when executed results in a single measurement that is stored with the
Profile.
+   * A set of expressions that is executed at the start of a window period.  A map is
+   * expected where the key is the variable name and the value is a Stellar expression.
+   * The map can contain 0 or more variables/expressions. At the start of each window
+   * period the expression is executed once and stored in a variable with the given
+   * name.
    */
-  private String result;
+  private Map<String, String> init = new HashMap<>();
 
   /**
-   * Defines how the state is initialized before any messages are received.
+   * A set of expressions that is executed when a message is applied to the profile.
+   * A map is expected where the key is the variable name and the value is a Stellar
+   * expression.  The map can include 0 or more variables/expressions.
    */
-  private Map<String, String> init = new HashMap<>();
+  private Map<String, String> update = new HashMap<>();
 
   /**
-   * Defines how the state is updated when a new message is received.
+   * A list of Stellar expressions that is executed in order and used to group the
+   * resulting profile data.
    */
-  private Map<String, String> update = new HashMap<>();
+  private List<String> groupBy = new ArrayList<>();
+
+  /**
+   * A Stellar expression that is executed when the window period expires.  The
+   * expression is expected to in some way summarize the messages that were applied
+   * to the profile over the window period.  The expression must result in a numeric
+   * value such as a Double, Long, Float, Short, or Integer.
+   */
+  private String result;
 
   public String getProfile() {
     return profile;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8b623b87/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
index 8080363..9aa511c 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
@@ -17,38 +17,43 @@
  */
 package org.apache.metron.common.dsl;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
 
-public class Context {
+public class Context implements Serializable {
+
   public interface Capability {
     Object get();
   }
-  public enum Capabilities {
-      HBASE_PROVIDER
-    , ZOOKEEPER_CLIENT
-    , SERVICE_DISCOVERER;
 
+  public enum Capabilities {
+    HBASE_PROVIDER,
+    ZOOKEEPER_CLIENT,
+    SERVICE_DISCOVERER
   }
 
   public static class Builder {
+
     private Map<String, Capability> capabilityMap = new HashMap<>();
 
     public Builder with(String s, Capability capability) {
       capabilityMap.put(s, capability);
       return this;
     }
+
     public Builder with(Enum<?> s, Capability capability) {
       capabilityMap.put(s.toString(), capability);
       return this;
     }
-    public Context build() {
 
+    public Context build() {
       return new Context(capabilityMap);
     }
   }
+
   public static Context EMPTY_CONTEXT() {
     return
     new Context(new HashMap<>()){
@@ -58,15 +63,17 @@ public class Context {
       }
     };
   }
+
   private Map<String, Capability> capabilities;
-  private Context( Map<String, Capability> capabilities
-                 )
-  {
+
+  private Context( Map<String, Capability> capabilities) {
     this.capabilities = capabilities;
   }
+
   public Optional<Object> getCapability(Enum<?> capability) {
     return getCapability(capability.toString());
   }
+
   public Optional<Object> getCapability(String capability) {
     Capability c = capabilities.get(capability);
     if(c == null) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8b623b87/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatisticsFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatisticsFunctions.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatisticsFunctions.java
index 75e86aa..dd01775 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatisticsFunctions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatisticsFunctions.java
@@ -110,7 +110,6 @@ public class StellarStatisticsFunctions {
     }
   }
 
-
   /**
    * Calculates the mean.
    *


Mime
View raw message