metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [12/18] metron git commit: METRON-987: Allow stellar enrichments to be specified by a list as well as a map closes apache/metron#609
Date Mon, 26 Jun 2017 17:27:33 GMT
METRON-987: Allow stellar enrichments to be specified by a list as well as a map closes apache/metron#609


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7d6121bf
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7d6121bf
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7d6121bf

Branch: refs/heads/Metron_0.4.0
Commit: 7d6121bfd3bd622940e9f411ac87b56d29755596
Parents: 61cbab4
Author: cstella <cestella@gmail.com>
Authored: Fri Jun 9 00:52:34 2017 -0400
Committer: cstella <cestella@gmail.com>
Committed: Fri Jun 9 00:52:34 2017 -0400

----------------------------------------------------------------------
 .../profiler/client/stellar/GetProfile.java     |   2 +-
 .../enrichment/handler/Config.java              |  41 +++-
 .../enrichment/handler/ConfigHandler.java       |  12 +-
 .../enrichment/handler/Configs.java             |   9 +-
 .../enrichment/handler/ListConfig.java          |  16 +-
 .../enrichment/handler/StellarConfig.java       |  82 ++++++--
 .../metron/common/message/JSONFromPosition.java |   7 +-
 .../common/stellar/BaseStellarProcessor.java    |   1 +
 .../common/stellar/StellarAssignment.java       | 135 +++++++++++++
 .../common/stellar/shell/StellarShell.java      |  19 +-
 .../StellarEnrichmentConfigTest.java            | 144 ++++++++++++++
 .../configuration/StellarEnrichmentTest.java    | 189 +++++++++++++++++++
 .../common/stellar/StellarAssignmentTest.java   |  63 +++++++
 metron-platform/metron-enrichment/README.md     |  91 +++++++--
 metron-platform/metron-enrichment/pom.xml       |   7 +
 .../adapters/stellar/StellarAdapter.java        |  98 +++++++---
 .../enrichment/bolt/EnrichmentJoinBolt.java     |   3 +-
 .../enrichment/bolt/EnrichmentSplitterBolt.java |   2 +-
 .../adapters/stellar/StellarAdapterTest.java    | 134 +++++++++++++
 19 files changed, 975 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
index ecce7e0..87232aa 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
@@ -247,7 +247,7 @@ public class GetProfile implements StellarFunction {
       return provider.getTable(HBaseConfiguration.create(), tableName);
 
     } catch (IOException e) {
-      throw new IllegalArgumentException(String.format("Unable to access table: %s", tableName));
+      throw new IllegalArgumentException(String.format("Unable to access table: %s", tableName), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/Config.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/Config.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/Config.java
index f447326..bbadeac 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/Config.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/Config.java
@@ -21,16 +21,53 @@ package org.apache.metron.common.configuration.enrichment.handler;
 import com.google.common.collect.ImmutableList;
 import org.json.simple.JSONObject;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 
 public interface Config {
+
+  /**
+   * Split a message by the fields.  Certain configs will do this differently than others, but
+   * these are the messages sent to the enrichment adapter downstream.
+   * @param message
+   * @param fields
+   * @param fieldToEnrichmentKey
+   * @param config The config to use
+   * @return
+   */
   List<JSONObject> splitByFields( JSONObject message
                           , Object fields
                           , Function<String, String> fieldToEnrichmentKey
-                          , Map<String, Object> config
+                          , Iterable<Map.Entry<String, Object>> config
                           );
 
-  List<String> getSubgroups(Map<String, Object> config);
+  default List<JSONObject> splitByFields( JSONObject message
+                          , Object fields
+                          , Function<String, String> fieldToEnrichmentKey
+                          , ConfigHandler handler
+                          ) {
+    return splitByFields(message, fields, fieldToEnrichmentKey, handler.getType().toConfig(handler.getConfig()));
+  }
+
+  /**
+   *
+   * Return the subgroups for a given enrichment.  This will allow the join bolt to know when the join is complete.
+   * NOTE: this implies that a given enrichment may have a 1 to many relationship with subgroups.
+   * @param config
+   * @return The list of subgroups
+   */
+  List<String> getSubgroups(Iterable<Map.Entry<String, Object>> config);
+
+  default List<String> getSubgroups(ConfigHandler handler) {
+    return getSubgroups(handler.getType().toConfig(handler.getConfig()));
+  }
+
+  /**
+   * Convert a config object (currently either a map or list is supported) to a list of configs.
+   * @param c Either a map or list representing the enrichment adapter configuration.
+   * @return
+   */
+  Iterable<Map.Entry<String, Object>> toConfig(Object c);
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java
index 996b33f..11a4852 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java
@@ -17,15 +17,13 @@
  */
 package org.apache.metron.common.configuration.enrichment.handler;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class ConfigHandler {
-  private Map<String, Object> config;
+  private Object config;
   private Configs type = Configs.LIST;
   public ConfigHandler(String enrichment, Map<String, Object> obj) {
-    config = (Map<String, Object>) obj.get("config");
+    config = obj.get("config");
     if(obj.containsKey("type")) {
       type = Configs.valueOf((String) obj.get("type"));
     }
@@ -39,11 +37,11 @@ public class ConfigHandler {
     config = new HashMap<>();
     type = Configs.LIST;
   }
-  public Map<String, Object> getConfig() {
+  public Object getConfig() {
     return config;
   }
 
-  public void setConfig(Map<String, Object> config) {
+  public void setConfig(Object config) {
     this.config = config;
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/Configs.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/Configs.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/Configs.java
index 210caf4..dced564 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/Configs.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/Configs.java
@@ -19,6 +19,7 @@ package org.apache.metron.common.configuration.enrichment.handler;
 
 import org.json.simple.JSONObject;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -36,16 +37,20 @@ public enum Configs implements Config {
   public List<JSONObject> splitByFields(JSONObject message
                                  , Object fields
                                  , Function<String, String> fieldToEnrichmentKey
-                                 , Map<String, Object> config
+                                 , Iterable<Map.Entry<String, Object>> config
                                  )
   {
     return configCreator.splitByFields(message, fields, fieldToEnrichmentKey, config);
   }
 
   @Override
-  public List<String> getSubgroups(Map<String, Object> config) {
+  public List<String> getSubgroups(Iterable<Map.Entry<String, Object>> config) {
     return configCreator.getSubgroups(config);
   }
 
+  @Override
+  public Iterable<Map.Entry<String, Object>> toConfig(Object c) {
+    return configCreator.toConfig(c);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ListConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ListConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ListConfig.java
index 7803947..852418a 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ListConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ListConfig.java
@@ -20,6 +20,8 @@ package org.apache.metron.common.configuration.enrichment.handler;
 import com.google.common.collect.ImmutableList;
 import org.json.simple.JSONObject;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -30,7 +32,7 @@ public class ListConfig implements Config {
   public List<JSONObject> splitByFields( JSONObject message
                                  , Object fieldsObj
                                  , Function<String, String> fieldToEnrichmentKey
-                                 , Map<String, Object> config
+                                 , Iterable<Map.Entry<String, Object>> config
                                  )
   {
     List<String> fields = (List<String>)fieldsObj;
@@ -45,7 +47,17 @@ public class ListConfig implements Config {
   }
 
   @Override
-  public List<String> getSubgroups(Map<String, Object> config) {
+  public List<String> getSubgroups(Iterable<Map.Entry<String, Object>> config) {
     return ImmutableList.of("");
   }
+
+  @Override
+  public Iterable<Map.Entry<String, Object>> toConfig(Object c) {
+    if(c instanceof Map) {
+      return ((Map<String, Object>)c).entrySet();
+    }
+    else {
+      return new ArrayList<>();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/StellarConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/StellarConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/StellarConfig.java
index 9a26873..7d49fbd 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/StellarConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/StellarConfig.java
@@ -17,7 +17,10 @@
  */
 package org.apache.metron.common.configuration.enrichment.handler;
 
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.stellar.StellarAssignment;
 import org.apache.metron.common.stellar.StellarProcessor;
 import org.json.simple.JSONObject;
 
@@ -27,14 +30,14 @@ import java.util.function.Function;
 public class StellarConfig implements Config {
 
   @Override
-  public List<String> getSubgroups(Map<String, Object> config) {
+  public List<String> getSubgroups(Iterable<Map.Entry<String, Object>> config) {
     boolean includeEmpty = false;
     List<String> ret = new ArrayList<>();
-    for(Map.Entry<String, Object> kv : config.entrySet()) {
+    for(Map.Entry<String, Object> kv : config) {
       if(kv.getValue() instanceof String) {
         includeEmpty = true;
       }
-      else if(kv.getValue() instanceof Map) {
+      else if(kv.getValue() instanceof Map || kv.getValue() instanceof List) {
         ret.add(kv.getKey());
       }
     }
@@ -45,46 +48,99 @@ public class StellarConfig implements Config {
   }
 
   @Override
+  public Iterable<Map.Entry<String, Object>> toConfig(Object c) {
+    if(c instanceof Map) {
+      return ((Map<String, Object>)c).entrySet();
+    }
+    else if(c instanceof Collection) {
+      List<Map.Entry<String, Object>> ret = new ArrayList<>();
+      for(Object o : (Collection)c) {
+        if(o instanceof String) {
+          StellarAssignment assignment = StellarAssignment.from((String)o);
+          ret.add(assignment);
+        }
+        else if(o instanceof Map.Entry) {
+          ret.add((Map.Entry<String, Object>)o);
+        }
+        else {
+          throw new IllegalStateException("Expected " + c + " to be a list of strings, but got non-string.");
+        }
+      }
+      return ret;
+    }
+    throw new IllegalStateException("Unable to convert config " + c
+                                   + " to stellar config.  Expected List<String> or Map<String, Object>");
+  }
+
+  @Override
   public List<JSONObject> splitByFields( JSONObject message
                                  , Object fields
                                  , Function<String, String> fieldToEnrichmentKey
-                                 , Map<String, Object> config
+                                 , Iterable<Map.Entry<String, Object>> config
                                  )
   {
     StellarProcessor processor = new StellarProcessor();
     List<JSONObject> messages = new ArrayList<>();
     Map<String, String> defaultStellarStatementGroup = new HashMap<>();
-    for(Map.Entry<String, Object> kv : config.entrySet()) {
+    for(Map.Entry<String, Object> kv : config) {
       if(kv.getValue() instanceof String) {
         defaultStellarStatementGroup.put(kv.getKey(), (String)kv.getValue());
       }
       else if(kv.getValue() instanceof Map) {
         JSONObject ret = new JSONObject();
-        ret.put(kv.getKey(), getMessage(processor, (Map<String, String>) kv.getValue(), message));
+        ret.put(kv.getKey(), getMessage(getFields(processor, (Map)kv.getValue()), message));
+        messages.add(ret);
+      }
+      else if(kv.getValue() instanceof List) {
+        JSONObject ret = new JSONObject();
+        ret.put(kv.getKey(), getMessage(getFields(processor, (List)kv.getValue()), message));
         messages.add(ret);
       }
     }
     if(defaultStellarStatementGroup.size() > 0)
     {
       JSONObject ret = new JSONObject();
-      ret.put("", getMessage(processor, defaultStellarStatementGroup, message));
+      ret.put("", getMessage(getFields(processor, defaultStellarStatementGroup), message));
       messages.add(ret);
     }
     return messages;
   }
 
-  private Map<String, Object> getMessage( StellarProcessor processor
-                                        , Map<String, String> stellarStatementGroup
-                                        , JSONObject message
-                                        )
+  private Set<String> getFields(StellarProcessor processor
+                               , List<String> stellarStatementGroup
+                               )
   {
     Set<String> stellarFields = new HashSet<>();
-    for(String stellarStatement: stellarStatementGroup.values()) {
+    for(String stellarStatementExpr: stellarStatementGroup) {
+      StellarAssignment assignment = StellarAssignment.from(stellarStatementExpr);
+      if(assignment.getStatement() != null) {
+        Set<String> variables = processor.variablesUsed(assignment.getStatement());
+        if (variables != null) {
+          stellarFields.addAll(variables);
+        }
+      }
+    }
+    return stellarFields;
+  }
+
+  private Set<String> getFields( StellarProcessor processor
+                               , Map<String, String> stellarStatementGroup
+  ) {
+    Set<String> stellarFields = new HashSet<>();
+    for (String stellarStatement : stellarStatementGroup.values()) {
       Set<String> variables = processor.variablesUsed(stellarStatement);
-      if(variables != null) {
+      if (variables != null) {
         stellarFields.addAll(variables);
       }
     }
+    return stellarFields;
+  }
+
+  private Map<String, Object> getMessage( Set<String> stellarFields
+                                        , JSONObject message
+                                        )
+  {
+
     Map<String, Object> messageSegment = new HashMap<>();
     for(String variable : stellarFields) {
       messageSegment.put(variable, message.get(variable));

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
index 4407d4f..c91a262 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.common.message;
 
+import org.apache.commons.io.Charsets;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -40,10 +41,12 @@ public class JSONFromPosition implements MessageGetStrategy {
 
   @Override
   public JSONObject get(Tuple tuple) {
+    String s = null;
     try {
-      return (JSONObject) parser.get().parse(new String(tuple.getBinary(position), "UTF8"));
+      s =  new String(tuple.getBinary(position), Charsets.UTF_8);
+      return (JSONObject) parser.get().parse(s);
     } catch (Exception e) {
-      throw new IllegalStateException(e.getMessage(), e);
+      throw new IllegalStateException("Unable to parse " + s + " due to " + e.getMessage(), e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/BaseStellarProcessor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/BaseStellarProcessor.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/BaseStellarProcessor.java
index a04ee43..8f4af93 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/BaseStellarProcessor.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/BaseStellarProcessor.java
@@ -234,4 +234,5 @@ public class BaseStellarProcessor<T> {
 
     return true;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/StellarAssignment.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/StellarAssignment.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/StellarAssignment.java
new file mode 100644
index 0000000..c28ebfb
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/StellarAssignment.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.stellar;
+
+
+import java.util.Map;
+
+public class StellarAssignment implements Map.Entry<String, Object>{
+  private String variable;
+  private String statement;
+
+  public StellarAssignment(String variable, String statement) {
+    this.variable = variable;
+    this.statement = statement;
+  }
+
+  public String getVariable() {
+    return variable;
+  }
+
+  public String getStatement() {
+    return statement;
+  }
+
+  public static boolean isAssignment(String statement) {
+    return statement != null && statement.contains(":=");
+  }
+
+  public static StellarAssignment from(String statement) {
+    if(statement == null || statement.length() == 0) {
+      return new StellarAssignment(null, null);
+    }
+    char prev = statement.charAt(0);
+    char curr;
+    String variable = "" + prev;
+    String s = null;
+    boolean isAssignment = false;
+    for(int i = 1;i < statement.length();++i,prev=curr) {
+      curr = statement.charAt(i);
+      if(prev == ':' && curr == '=') {
+        isAssignment = true;
+        variable = variable.substring(0, variable.length() - 1);
+        s = "";
+        continue;
+      }
+      if(!isAssignment) {
+        variable += curr;
+      }
+      else {
+        s += curr;
+      }
+    }
+
+    if(!isAssignment) {
+      s = variable;
+      variable = null;
+    }
+
+    if(s != null) {
+      s = s.trim();
+    }
+    if(variable != null) {
+      variable = variable.trim();
+    }
+    return new StellarAssignment(variable, s);
+  }
+
+  /**
+   * Returns the key corresponding to this entry.
+   *
+   * @return the key corresponding to this entry
+   * @throws IllegalStateException implementations may, but are not
+   *                               required to, throw this exception if the entry has been
+   *                               removed from the backing map.
+   */
+  @Override
+  public String getKey() {
+    return variable;
+  }
+
+  /**
+   * Returns the value corresponding to this entry.  If the mapping
+   * has been removed from the backing map (by the iterator's
+   * <tt>remove</tt> operation), the results of this call are undefined.
+   *
+   * @return the value corresponding to this entry
+   * @throws IllegalStateException implementations may, but are not
+   *                               required to, throw this exception if the entry has been
+   *                               removed from the backing map.
+   */
+  @Override
+  public Object getValue() {
+    return statement;
+  }
+
+  /**
+   * Replaces the value corresponding to this entry with the specified
+   * value (optional operation).  (Writes through to the map.)  The
+   * behavior of this call is undefined if the mapping has already been
+   * removed from the map (by the iterator's <tt>remove</tt> operation).
+   *
+   * @param value new value to be stored in this entry
+   * @return old value corresponding to the entry
+   * @throws UnsupportedOperationException if the <tt>put</tt> operation
+   *                                       is not supported by the backing map
+   * @throws ClassCastException            if the class of the specified value
+   *                                       prevents it from being stored in the backing map
+   * @throws NullPointerException          if the backing map does not permit
+   *                                       null values, and the specified value is null
+   * @throws IllegalArgumentException      if some property of this value
+   *                                       prevents it from being stored in the backing map
+   * @throws IllegalStateException         implementations may, but are not
+   *                                       required to, throw this exception if the entry has been
+   *                                       removed from the backing map.
+   */
+  @Override
+  public String setValue(Object value) {
+    throw new UnsupportedOperationException("Assignments are immutable.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarShell.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarShell.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarShell.java
index 4e24212..f13f1e3 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarShell.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarShell.java
@@ -33,6 +33,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.dsl.Context;
 import org.apache.metron.common.dsl.StellarFunctionInfo;
 import org.apache.metron.common.dsl.functions.resolver.ClasspathFunctionResolver;
+import org.apache.metron.common.stellar.StellarAssignment;
 import org.apache.metron.common.utils.JSONUtils;
 import org.jboss.aesh.complete.CompleteOperation;
 import org.jboss.aesh.complete.Completion;
@@ -247,19 +248,17 @@ public class StellarShell extends AeshConsoleCallback implements Completion {
    */
   private void handleStellar(String expression) {
 
-    Iterable<String> assignmentSplit = Splitter.on(":=").split(expression);
     String stellarExpression = expression;
     String variable = null;
-    if(Iterables.size(assignmentSplit) == 2) {
-      //assignment
-      variable = Iterables.getFirst(assignmentSplit, null);
-      if(variable != null) {
-        variable = variable.trim();
-      }
-      stellarExpression = Iterables.getLast(assignmentSplit, null);
+    if(StellarAssignment.isAssignment(expression)) {
+      StellarAssignment expr = StellarAssignment.from(expression);
+      variable = expr.getVariable();
+      stellarExpression = expr.getStatement();
     }
-    if(!stellarExpression.isEmpty()) {
-      stellarExpression = stellarExpression.trim();
+    else {
+      if (!stellarExpression.isEmpty()) {
+        stellarExpression = stellarExpression.trim();
+      }
     }
     Object result = executeStellar(stellarExpression);
     if(result != null && variable == null) {

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/StellarEnrichmentConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/StellarEnrichmentConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/StellarEnrichmentConfigTest.java
new file mode 100644
index 0000000..0883672
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/StellarEnrichmentConfigTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
+import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.apache.metron.common.configuration.enrichment.handler.Configs;
+import org.apache.metron.common.utils.JSONUtils;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class StellarEnrichmentConfigTest extends StellarEnrichmentTest {
+
+
+  @Test
+  public void testSplitter_default() throws IOException {
+    JSONObject message = getMessage();
+    for(String c : DEFAULT_CONFIGS) {
+      EnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(c, EnrichmentConfig.class);
+      Assert.assertNotNull(enrichmentConfig.getEnrichmentConfigs().get("stellar"));
+      ConfigHandler handler = enrichmentConfig.getEnrichmentConfigs().get("stellar");
+      List<JSONObject> splits = Configs.STELLAR.splitByFields(message, null, x -> null, handler );
+      Assert.assertEquals(1, splits.size());
+      Map<String, Object> split = (Map<String, Object>) splits.get(0).get("");
+      Assert.assertEquals(3, split.size());
+      Assert.assertEquals("stellar_test", split.get("source.type"));
+      Assert.assertEquals("foo", split.get("string"));
+      Assert.assertNull(split.get("stmt1"));
+    }
+  }
+
+  @Test
+  public void testGetSubgroups_default() throws IOException {
+    for(String c : DEFAULT_CONFIGS) {
+      EnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(c, EnrichmentConfig.class);
+      Assert.assertNotNull(enrichmentConfig.getEnrichmentConfigs().get("stellar"));
+      ConfigHandler handler = enrichmentConfig.getEnrichmentConfigs().get("stellar");
+      List<String> subgroups = Configs.STELLAR.getSubgroups(handler);
+      Assert.assertEquals("", subgroups.get(0));
+      Assert.assertEquals(1, subgroups.size());
+    }
+  }
+
+  @Test
+  public void testSplitter_grouped() throws IOException {
+    JSONObject message = getMessage();
+    for(String c : GROUPED_CONFIGS) {
+      EnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(c, EnrichmentConfig.class);
+      Assert.assertNotNull(enrichmentConfig.getEnrichmentConfigs().get("stellar"));
+      ConfigHandler handler = enrichmentConfig.getEnrichmentConfigs().get("stellar");
+      List<JSONObject> splits = Configs.STELLAR.splitByFields(message, null, x -> null, handler );
+      Assert.assertEquals(2, splits.size());
+      {
+        Map<String, Object> split = (Map<String, Object>) splits.get(0).get("group1");
+        Assert.assertEquals(2, split.size());
+        Assert.assertEquals("stellar_test", split.get("source.type"));
+        Assert.assertNull(split.get("stmt1"));
+      }
+      {
+        Map<String, Object> split = (Map<String, Object>) splits.get(1).get("group2");
+        Assert.assertEquals(1, split.size());
+        Assert.assertEquals("foo", split.get("string"));
+      }
+    }
+  }
+
+  @Test
+  public void testGetSubgroups_grouped() throws IOException {
+    for(String c : GROUPED_CONFIGS) {
+      EnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(c, EnrichmentConfig.class);
+      Assert.assertNotNull(enrichmentConfig.getEnrichmentConfigs().get("stellar"));
+      ConfigHandler handler = enrichmentConfig.getEnrichmentConfigs().get("stellar");
+      List<String> subgroups = Configs.STELLAR.getSubgroups(handler);
+      Assert.assertEquals("group1", subgroups.get(0));
+      Assert.assertEquals("group2", subgroups.get(1));
+      Assert.assertEquals(2, subgroups.size());
+    }
+  }
+
+
+  @Test
+  public void testSplitter_mixed() throws IOException {
+    JSONObject message = getMessage();
+    for(String c : Iterables.concat(MIXED_CONFIGS, ImmutableList.of(tempVarStellarConfig_list))) {
+      EnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(c, EnrichmentConfig.class);
+      Assert.assertNotNull(enrichmentConfig.getEnrichmentConfigs().get("stellar"));
+      ConfigHandler handler = enrichmentConfig.getEnrichmentConfigs().get("stellar");
+      List<JSONObject> splits = Configs.STELLAR.splitByFields(message, null, x -> null, handler );
+      Assert.assertEquals(3, splits.size());
+      {
+        Map<String, Object> split = (Map<String, Object>) splits.get(0).get("group1");
+        Assert.assertEquals(2, split.size());
+        Assert.assertEquals("stellar_test", split.get("source.type"));
+        Assert.assertNull(split.get("stmt1"));
+      }
+      {
+        Map<String, Object> split = (Map<String, Object>) splits.get(1).get("group2");
+        Assert.assertEquals(1, split.size());
+        Assert.assertEquals("foo", split.get("string"));
+      }
+      {
+        Map<String, Object> split = (Map<String, Object>) splits.get(2).get("");
+        Assert.assertEquals(1, split.size());
+        Assert.assertEquals("stellar_test", split.get("source.type"));
+      }
+    }
+  }
+
+  @Test
+  public void testGetSubgroups_mixed() throws IOException {
+    for(String c : MIXED_CONFIGS) {
+      EnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(c, EnrichmentConfig.class);
+      Assert.assertNotNull(enrichmentConfig.getEnrichmentConfigs().get("stellar"));
+      ConfigHandler handler = enrichmentConfig.getEnrichmentConfigs().get("stellar");
+      List<String> subgroups = Configs.STELLAR.getSubgroups(handler);
+      Assert.assertEquals("group1", subgroups.get(0));
+      Assert.assertEquals("group2", subgroups.get(1));
+      Assert.assertEquals("", subgroups.get(2));
+      Assert.assertEquals(3, subgroups.size());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/StellarEnrichmentTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/StellarEnrichmentTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/StellarEnrichmentTest.java
new file mode 100644
index 0000000..3af15cd
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/StellarEnrichmentTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableList;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.utils.JSONUtils;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class StellarEnrichmentTest {
+  /**
+   {
+    "fieldMap": {
+      "stellar" : {
+        "config" : {
+          "stmt1" : "TO_UPPER(source.type)",
+          "stmt2" : "TO_LOWER(stmt1)",
+          "stmt3" : "TO_LOWER(string)"
+        }
+      }
+    }
+  }
+   */
+  @Multiline
+  public static String defaultStellarConfig_map;
+
+  /**
+   {
+    "fieldMap": {
+      "stellar" : {
+        "config" : [
+          "stmt1 := TO_UPPER(source.type)",
+          "stmt2 := TO_LOWER(stmt1)",
+          "stmt3 := TO_LOWER(string)"
+        ]
+      }
+    }
+  }
+   */
+  @Multiline
+  public static String defaultStellarConfig_list;
+  public static List<String> DEFAULT_CONFIGS = ImmutableList.of(defaultStellarConfig_list, defaultStellarConfig_map);
+
+/**
+   {
+    "fieldMap": {
+      "stellar" : {
+        "config" : {
+          "group1" : {
+            "stmt1" : "TO_UPPER(source.type)",
+            "stmt2" : "TO_LOWER(stmt1)"
+          },
+          "group2" : {
+            "stmt3" : "TO_LOWER(string)"
+          }
+        }
+      }
+    }
+  }
+   */
+  @Multiline
+  public static String groupedStellarConfig_map;
+
+  /**
+   {
+    "fieldMap": {
+      "stellar" : {
+        "config" : {
+          "group1" : [
+            "stmt1 := TO_UPPER(source.type)",
+            "stmt2 := TO_LOWER(stmt1)"
+          ],
+          "group2" : [
+            "stmt3 := TO_LOWER(string)"
+          ]
+        }
+      }
+    }
+  }
+   */
+  @Multiline
+  public static String groupedStellarConfig_list;
+  public static List<String> GROUPED_CONFIGS = ImmutableList.of(groupedStellarConfig_list, groupedStellarConfig_map);
+
+  /**
+   {
+    "fieldMap": {
+      "stellar" : {
+        "config" : {
+          "group1" : {
+            "stmt1" : "TO_UPPER(source.type)",
+            "stmt2" : "TO_LOWER(stmt1)"
+          },
+          "group2" : {
+            "stmt3" : "TO_LOWER(string)"
+          },
+          "stmt4" : "1 + 1",
+          "stmt5" : "FORMAT('%s', source.type)"
+        }
+      }
+    }
+  }
+   */
+  @Multiline
+  public static String mixedStellarConfig_map;
+
+  /**
+   {
+    "fieldMap": {
+      "stellar" : {
+        "config" : {
+          "group1" : [
+            "stmt1 := TO_UPPER(source.type)",
+            "stmt2 := TO_LOWER(stmt1)"
+          ],
+          "group2" : [
+            "stmt3 := TO_LOWER(string)"
+          ],
+          "stmt4" : "1 + 1",
+          "stmt5" : "FORMAT('%s', source.type)"
+        }
+      }
+    }
+  }
+   */
+  @Multiline
+  public static String mixedStellarConfig_list;
+  public static List<String> MIXED_CONFIGS = ImmutableList.of(mixedStellarConfig_list, mixedStellarConfig_map);
+
+  /**
+   {
+    "fieldMap": {
+      "stellar" : {
+        "config" : {
+          "group1" : [
+            "stmt1 := TO_UPPER(source.type)",
+            "stmt2 := TO_LOWER(stmt1)",
+            "stmt1 := null"
+          ],
+          "group2" : [
+            "stmt3 := TO_LOWER(string)"
+          ],
+          "stmt4" : "1 + 1",
+          "stmt5" : "FORMAT('%s', source.type)"
+        }
+      }
+    }
+  }
+   */
+  @Multiline
+  public static String tempVarStellarConfig_list;
+
+  /**
+   {
+    "string" : "foo"
+   ,"number" : 2
+   ,"source.type" : "stellar_test"
+   }
+   */
+  @Multiline
+  public static String message;
+
+  public static JSONObject getMessage() throws IOException {
+    Map<String, Object> ret = JSONUtils.INSTANCE.load(message, new TypeReference<Map<String, Object>>() {
+    });
+    return new JSONObject(ret);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarAssignmentTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarAssignmentTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarAssignmentTest.java
new file mode 100644
index 0000000..c4a6060
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarAssignmentTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.stellar;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StellarAssignmentTest {
+
+  @Test
+  public void testAssignment() {
+    for(String statement : ImmutableList.of( "foo := bar + grok"
+                                           , "foo   := bar + grok"
+                                           , "foo := bar + grok   "
+                                           )
+       )
+    {
+      StellarAssignment assignment = StellarAssignment.from(statement);
+      Assert.assertEquals("foo", assignment.getKey());
+      Assert.assertEquals("foo", assignment.getVariable());
+      Assert.assertEquals("bar + grok", assignment.getStatement());
+      Assert.assertEquals("bar + grok", assignment.getValue());
+    }
+  }
+
+  @Test
+  public void testNonAssignment() {
+    for(String statement : ImmutableList.of( "bar + grok"
+                                           , "  bar + grok"
+                                           , "bar + grok   "
+    )
+            )
+    {
+      StellarAssignment assignment = StellarAssignment.from(statement);
+      Assert.assertNull( assignment.getKey());
+      Assert.assertNull( assignment.getVariable());
+      Assert.assertEquals("bar + grok", assignment.getStatement());
+      Assert.assertEquals("bar + grok", assignment.getValue());
+    }
+  }
+
+  @Test(expected=UnsupportedOperationException.class)
+  public void testImmutability() {
+    StellarAssignment assignment = StellarAssignment.from("foo := bar");
+    assignment.setValue("myval");
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-enrichment/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/README.md b/metron-platform/metron-enrichment/README.md
index a789f0f..e9d4dde 100644
--- a/metron-platform/metron-enrichment/README.md
+++ b/metron-platform/metron-enrichment/README.md
@@ -54,7 +54,9 @@ The `config` map is intended to house enrichment specific configuration.
 For instance, for the `hbaseEnrichment`, the mappings between the
 enrichment types to the column families is specified.
 
-The `fieldMap`contents are of interest because they contain the routing and configuration information for the enrichments.  When we say 'routing', we mean how the messages get split up and sent to the enrichment adapter bolts.  The simplest, by far, is just providing a simple list as in
+The `fieldMap`contents are of interest because they contain the routing and configuration information for the enrichments.  
+When we say 'routing', we mean how the messages get split up and sent to the enrichment adapter bolts.  
+The simplest, by far, is just providing a simple list as in
 ```
     "fieldMap": {
       "geo": [
@@ -71,40 +73,94 @@ The `fieldMap`contents are of interest because they contain the routing and conf
       ]
       }
 ```
-Based on this sample config, both ip_src_addr and ip_dst_addr will go to the `geo`, `host`, and `hbaseEnrichment` adapter bolts. For the `geo`, `host` and `hbaseEnrichment`, this is sufficient.  However, more complex enrichments may contain their own configuration.  Currently, the `stellar` enrichment requires a more complex configuration, such as:
+Based on this sample config, both `ip_src_addr` and `ip_dst_addr` will go to the `geo`, `host`, and 
+`hbaseEnrichment` adapter bolts. 
+ 
+#### Stellar Enrichment Configuration
+For the `geo`, `host` and `hbaseEnrichment`, this is sufficient. However, more complex enrichments 
+may contain their own configuration.  Currently, the `stellar` enrichment is more adaptable and thus
+requires a more nuanced configuration.
+
+At its most basic, we want to take a message and apply a couple of enrichments, such as converting the
+`hostname` field to lowercase. We do this by specifying the transformation inside of the 
+`config` for the `stellar` fieldMap.  There are two syntaxes that are supported, specifying the transformations
+as a map with the key as the field and the value the stellar expression:
 ```
     "fieldMap": {
        ...
       "stellar" : {
         "config" : {
-          "numeric" : {
-                      "foo": "1 + 1"
-                      }
-          ,"ALL_CAPS" : "TO_UPPER(source.type)"
+          "hostname" : "TO_LOWER(hostname)"
         }
       }
     }
 ```
 
-Whereas the simpler enrichments just need a set of fields explicitly stated so they can be separated from the message and sent to the enrichment adapter bolt for enrichment and ultimately joined back in the join bolt, the stellar enrichment has its set of required fields implicitly stated through usage.  For instance, if your stellar statement references a field, it should be included and if not, then it should not be included.  We did not want to require users to make explicit the implicit.
+Another approach is to make the transformations as a list with the same `var := expr` syntax as is used
+in the Stellar REPL, such as:
+```
+    "fieldMap": {
+       ...
+      "stellar" : {
+        "config" : [
+          "hostname := TO_LOWER(hostname)"
+        ]
+      }
+    }
+```
+
+Sometimes arbitrary stellar enrichments may take enough time that you would prefer to split some of them
+into groups and execute the groups of stellar enrichments in parallel.  Take, for instance, if you wanted
+to do an HBase enrichment and a profiler call which were independent of one another.  This usecase is 
+supported by splitting the enrichments up as groups.
 
-The other way in which the stellar enrichment is somewhat more complex is in how the statements are executed.  In the general purpose case for a list of fields, those fields are used to create a message to send to the enrichment adapter bolt and that bolt's worker will handle the fields one by one in serial for a given message.  For stellar enrichment, we wanted to have a more complex design so that users could specify the groups of stellar statements sent to the same worker in the same message (and thus executed sequentially).  Consider the following configuration:
+Consider the following example:
 ```
     "fieldMap": {
+       ...
       "stellar" : {
         "config" : {
-          "numeric" : {
-                      "foo": "1 + 1"
-                      "bar" : TO_LOWER(source.type)"
-                      }
-         ,"text" : {
-                   "ALL_CAPS" : "TO_UPPER(source.type)"
-                   }
+          "malicious_domain_enrichment" : {
+            "is_bad_domain" : "ENRICHMENT_EXISTS('malicious_domains', ip_dst_addr, 'enrichments', 'cf')"
+          },
+          "login_profile" : [
+            "profile_window := PROFILE_WINDOW('from 6 months ago')", 
+            "global_login_profile := PROFILE_GET('distinct_login_attempts', 'global', profile_window)",
+            "stats := STATS_MERGE(global_login_profile)",
+            "auth_attempts_median := STATS_PERCENTILE(stats, 0.5)", 
+            "auth_attempts_sd := STATS_SD(stats)",
+            "profile_window := null", 
+            "global_login_profile := null", 
+            "stats := null"
+          ]
         }
       }
     }
 ```
-We have a group called `numeric` whose stellar statements will be executed sequentially.  In parallel to that, we have the group of stellar statements under the group `text` executing.  The intent here is to allow you to not force higher latency operations to be done sequentially. You can use any name for your groupings you like. Be aware that the configuration is a map and duplicate configuration keys' values are not combined, so the duplicate configuration value will be overwritten.
+
+Here we want to perform two enrichments that hit HBase and we would rather not run in sequence.  These
+enrichments are entirely independent of one another (i.e. neither relies on the output of the other).  In
+this case, we've created a group called `malicious_domain_enrichment` to inquire about whether the destination
+address exists in the HBase enrichment table in the `malicious_domains` enrichment type.  This is a simple
+enrichment, so we can express the enrichment group as a map with the new field `is_bad_domain` being a key
+and the stellar expression associated with that operation being the associated value.
+
+In contrast, the stellar enrichment group `login_profile` is interacting with the profiler, has multiple temporary
+expressions (i.e. `profile_window`, `global_login_profile`, and `stats`) that are useful only within the context
+of this group of stellar expressions.  In this case, we would need to ensure that we use the list construct
+when specifying the group and remember to set the temporary variables to `null` so they are not passed along.
+
+In general, things to note from this section are as follows:
+* The stellar enrichments for the `stellar` enrichment adapter are specified in the `config` for the `stellar` enrichment
+adapter in the `fieldMap`
+* Groups of independent (i.e. no expression in any group depend on the output of an expression from an other group) may be executed in parallel
+* If you have the need to use temporary variables, you may use the list construct.  Ensure that you assign the variables to `null` before the end of the group.
+* **Ensure that you do not assign a field to a stellar expression which returns an object which JSON cannot represent.**
+* Fields assigned to Maps as part of stellar enrichments have the maps unfolded, similar to the HBase Enrichment
+  * For example the stellar enrichment for field `foo` which assigns a map such as `foo := { 'grok' : 1, 'bar' : 'baz'}`
+  would yield the following fields:
+    * `foo.grok` == `1`
+    * `foo.bar` == `'baz'`
 
 ### The `threatIntel` Configuration
 
@@ -117,7 +173,8 @@ We have a group called `numeric` whose stellar statements will be executed seque
 
 The `config` map is intended to house threat intel specific configuration.
 For instance, for the `hbaseThreatIntel` threat intel adapter, the mappings between the
-enrichment types to the column families is specified.
+enrichment types to the column families is specified.  The `fieldMap` configuration is similar to the `enrichment`
+configuration in that the adapters available are the same.
 
 The `triageConfig` field is also a complex field and it bears some description:
 

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-enrichment/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/pom.xml b/metron-platform/metron-enrichment/pom.xml
index 29de74f..d525a64 100644
--- a/metron-platform/metron-enrichment/pom.xml
+++ b/metron-platform/metron-enrichment/pom.xml
@@ -40,6 +40,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-storm-kafka</artifactId>
             <version>${project.parent.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
index bc5f2b6..53ef16f 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
@@ -24,6 +24,7 @@ import org.apache.metron.common.dsl.MapVariableResolver;
 import org.apache.metron.common.dsl.StellarFunctions;
 import org.apache.metron.common.dsl.VariableResolver;
 import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.common.utils.ConversionUtils;
 import org.apache.metron.enrichment.bolt.CacheKey;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.json.simple.JSONObject;
@@ -37,7 +38,11 @@ import java.util.function.Function;
 import static org.apache.metron.enrichment.bolt.GenericEnrichmentBolt.STELLAR_CONTEXT_CONF;
 
 public class StellarAdapter implements EnrichmentAdapter<CacheKey>,Serializable {
+  public static class Perf {}
   protected static final Logger _LOG = LoggerFactory.getLogger(StellarAdapter.class);
+  protected static final Logger _PERF_LOG = LoggerFactory.getLogger(Perf.class);
+  public static final String STELLAR_SLOW_LOG = "stellar.slow.threshold.ms";
+  public static final Long STELLAR_SLOW_LOG_DEFAULT = 1000l;
 
   private enum EnrichmentType implements Function<SensorEnrichmentConfig, ConfigHandler>{
     ENRICHMENT(config -> config.getEnrichment().getEnrichmentConfigs().get("stellar"))
@@ -74,6 +79,63 @@ public class StellarAdapter implements EnrichmentAdapter<CacheKey>,Serializable
     return field;
   }
 
+  public static Iterable<Map.Entry<String, Object>> getStellarStatements(ConfigHandler handler, String field) {
+    if(field.length() == 0) {
+      return handler.getType().toConfig(handler.getConfig());
+    }
+    else {
+      Map<String, Object> groupStatements = (Map<String, Object>)handler.getConfig();
+      return handler.getType().toConfig(groupStatements.get(field));
+    }
+  }
+
+  public static JSONObject process( Map<String, Object> message
+                                           , ConfigHandler handler
+                                           , String field
+                                           , Long slowLogThreshold
+                                           , StellarProcessor processor
+                                           , VariableResolver resolver
+                                           , Context stellarContext
+                                           )
+  {
+    JSONObject ret = new JSONObject();
+    Iterable<Map.Entry<String, Object>> stellarStatements = getStellarStatements(handler, field);
+
+    if(stellarStatements != null) {
+      for (Map.Entry<String, Object> kv : stellarStatements) {
+        if(kv.getKey() != null && kv.getValue() != null) {
+          if (kv.getValue() instanceof String) {
+            long startTime = System.currentTimeMillis();
+            String stellarStatement = (String) kv.getValue();
+            Object o = processor.parse(stellarStatement, resolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext);
+            if (slowLogThreshold != null && _PERF_LOG.isDebugEnabled()) {
+              long duration = System.currentTimeMillis() - startTime;
+              if (duration > slowLogThreshold) {
+                _PERF_LOG.debug("SLOW LOG: " + stellarStatement + " took" + duration + "ms");
+              }
+            }
+            if (o != null && o instanceof Map) {
+              for (Map.Entry<Object, Object> valueKv : ((Map<Object, Object>) o).entrySet()) {
+                String newKey = ((kv.getKey().length() > 0) ? kv.getKey() + "." : "") + valueKv.getKey();
+                message.put(newKey, valueKv.getValue());
+                ret.put(newKey, valueKv.getValue());
+              }
+            }
+            else if(o == null) {
+              message.remove(kv.getKey());
+              ret.remove(kv.getKey());
+            }
+            else {
+              message.put(kv.getKey(), o);
+              ret.put(kv.getKey(), o);
+            }
+          }
+        }
+      }
+    }
+    return ret;
+  }
+
   @Override
   public JSONObject enrich(CacheKey value) {
     Context stellarContext = (Context) value.getConfig().getConfiguration().get(STELLAR_CONTEXT_CONF);
@@ -81,33 +143,25 @@ public class StellarAdapter implements EnrichmentAdapter<CacheKey>,Serializable
     Map<String, Object> globalConfig = value.getConfig().getConfiguration();
     Map<String, Object> sensorConfig = value.getConfig().getEnrichment().getConfig();
     if(handler == null) {
-        _LOG.trace("Stellar ConfigHandler is null.");
+      _LOG.trace("Stellar ConfigHandler is null.");
       return new JSONObject();
     }
+    Long slowLogThreshold = null;
+    if(_PERF_LOG.isDebugEnabled()) {
+      slowLogThreshold = ConversionUtils.convert(globalConfig.getOrDefault(STELLAR_SLOW_LOG, STELLAR_SLOW_LOG_DEFAULT), Long.class);
+    }
     Map<String, Object> message = value.getValue(Map.class);
     VariableResolver resolver = new MapVariableResolver(message, sensorConfig, globalConfig);
     StellarProcessor processor = new StellarProcessor();
-    Map<String, Object> stellarStatements = value.getField().length() == 0? handler.getConfig()
-                                                                          : (Map)handler.getConfig().get(value.getField());
-    if(stellarStatements != null) {
-      for (Map.Entry<String, Object> kv : stellarStatements.entrySet()) {
-        if(kv.getValue() instanceof String) {
-          String stellarStatement = (String) kv.getValue();
-          Object o = processor.parse(stellarStatement, resolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext);
-          if(o != null && o instanceof Map) {
-            for(Map.Entry<Object, Object> valueKv : ((Map<Object, Object>)o).entrySet()) {
-              String newKey = ((kv.getKey().length() > 0)?kv.getKey() + "." : "" )+ valueKv.getKey();
-              message.put(newKey, valueKv.getValue());
-            }
-          }
-          else {
-            message.put(kv.getKey(), o);
-          }
-        }
-      }
-    }
-    JSONObject enriched = new JSONObject(message);
-    _LOG.trace("Stellar Enrichment Success: " + enriched);
+    JSONObject enriched = process(message
+                                 , handler
+                                 , value.getField()
+                                 , slowLogThreshold
+                                 , processor
+                                 , resolver
+                                 , stellarContext
+                                 );
+    _LOG.trace("Stellar Enrichment Success: {}", enriched);
     return enriched;
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index 4b88399..61e0b22 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -57,7 +57,8 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
     if(fieldMap != null) {
       for (String enrichmentType : fieldMap.keySet()) {
         ConfigHandler handler = handlerMap.get(enrichmentType);
-        for(String subgroup : handler.getType().getSubgroups(handler.getConfig())) {
+        List<String> subgroups = handler.getType().getSubgroups(handler.getType().toConfig(handler.getConfig()));
+        for(String subgroup : subgroups) {
           streamIds.add(Joiner.on(":").join(enrichmentType, subgroup));
         }
       }

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
index f9cad80..6529296 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
@@ -124,7 +124,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
               .splitByFields( message
                       , fields
                       , field -> getKeyName(enrichmentType, field)
-                      , retriever.getConfig()
+                      , retriever
               );
       for(JSONObject eo : enrichmentObject) {
         eo.put(Constants.SENSOR_TYPE, sensorType);

http://git-wip-us.apache.org/repos/asf/metron/blob/7d6121bf/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapterTest.java
new file mode 100644
index 0000000..5624d4b
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapterTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.adapters.stellar;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.metron.common.configuration.StellarEnrichmentTest;
+import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
+import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.MapVariableResolver;
+import org.apache.metron.common.dsl.VariableResolver;
+import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.common.utils.JSONUtils;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StellarAdapterTest extends StellarEnrichmentTest {
+  StellarProcessor processor = new StellarProcessor();
+
+  private JSONObject enrich(JSONObject message, String field, ConfigHandler handler) {
+    VariableResolver resolver = new MapVariableResolver(message);
+    return StellarAdapter.process( message
+                                 , handler
+                                 , field
+                                 , 1000L
+                                 , processor
+                                 , resolver
+                                 , Context.EMPTY_CONTEXT()
+                                 );
+  }
+
+  @Test
+  public void test_default() throws Exception {
+    for(String c : DEFAULT_CONFIGS) {
+      JSONObject message = getMessage();
+      EnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(c, EnrichmentConfig.class);
+      Assert.assertNotNull(enrichmentConfig.getEnrichmentConfigs().get("stellar"));
+      ConfigHandler handler = enrichmentConfig.getEnrichmentConfigs().get("stellar");
+      JSONObject enriched = enrich(message, "", handler);
+      Assert.assertEquals("STELLAR_TEST", enriched.get("stmt1"));
+      Assert.assertEquals("stellar_test", enriched.get("stmt2"));
+      Assert.assertEquals("foo", enriched.get("stmt3"));
+      Assert.assertEquals(3, enriched.size());
+    }
+  }
+
+  @Test
+  public void test_grouped() throws Exception {
+    for(String c : GROUPED_CONFIGS) {
+      JSONObject message = getMessage();
+      EnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(c, EnrichmentConfig.class);
+      Assert.assertNotNull(enrichmentConfig.getEnrichmentConfigs().get("stellar"));
+      ConfigHandler handler = enrichmentConfig.getEnrichmentConfigs().get("stellar");
+      {
+        JSONObject enriched = enrich(message, "group1", handler);
+        Assert.assertEquals("STELLAR_TEST", enriched.get("stmt1"));
+        Assert.assertEquals("stellar_test", enriched.get("stmt2"));
+        Assert.assertEquals(2, enriched.size());
+      }
+      {
+        JSONObject enriched = enrich(message, "group2", handler);
+        Assert.assertEquals("foo", enriched.get("stmt3"));
+        Assert.assertEquals(1, enriched.size());
+      }
+    }
+  }
+
+  @Test
+  public void test_mixed() throws Exception {
+    for(String c : MIXED_CONFIGS) {
+      JSONObject message = getMessage();
+      EnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(c, EnrichmentConfig.class);
+      Assert.assertNotNull(enrichmentConfig.getEnrichmentConfigs().get("stellar"));
+      ConfigHandler handler = enrichmentConfig.getEnrichmentConfigs().get("stellar");
+      {
+        JSONObject enriched = enrich(message, "group1", handler);
+        Assert.assertEquals("STELLAR_TEST", enriched.get("stmt1"));
+        Assert.assertEquals("stellar_test", enriched.get("stmt2"));
+        Assert.assertEquals(2, enriched.size());
+      }
+      {
+        JSONObject enriched = enrich(message, "group2", handler);
+        Assert.assertEquals("foo", enriched.get("stmt3"));
+        Assert.assertEquals(1, enriched.size());
+      }
+      {
+        JSONObject enriched = enrich(message, "", handler);
+        Assert.assertEquals(2, enriched.get("stmt4"));
+        Assert.assertEquals("stellar_test", enriched.get("stmt5"));
+        Assert.assertEquals(2, enriched.size());
+      }
+    }
+  }
+
+  @Test
+  public void test_tempVariable() throws Exception {
+    JSONObject message = getMessage();
+    EnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(tempVarStellarConfig_list, EnrichmentConfig.class);
+    Assert.assertNotNull(enrichmentConfig.getEnrichmentConfigs().get("stellar"));
+    ConfigHandler handler = enrichmentConfig.getEnrichmentConfigs().get("stellar");
+    {
+      JSONObject enriched = enrich(message, "group1", handler);
+      Assert.assertEquals("stellar_test", enriched.get("stmt2"));
+      Assert.assertEquals(1, enriched.size());
+    }
+    {
+      JSONObject enriched = enrich(message, "group2", handler);
+      Assert.assertEquals("foo", enriched.get("stmt3"));
+      Assert.assertEquals(1, enriched.size());
+    }
+    {
+      JSONObject enriched = enrich(message, "", handler);
+      Assert.assertEquals(2, enriched.get("stmt4"));
+      Assert.assertEquals("stellar_test", enriched.get("stmt5"));
+      Assert.assertEquals(2, enriched.size());
+    }
+  }
+}


Mime
View raw message