metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject [2/3] metron git commit: METRON-739 Create Local Profile Runner (nickwallen) closes apache/metron#693
Date Tue, 15 Aug 2017 21:33:54 GMT
http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
new file mode 100644
index 0000000..41c31d7
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
@@ -0,0 +1,332 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import static java.lang.String.format;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.ListUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.profiler.clock.Clock;
+import org.apache.metron.profiler.clock.WallClock;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for building and maintaining a Profile.
+ *
+ * One or more messages are applied to the Profile with `apply` and a profile measurement is
+ * produced by calling `flush`.
+ *
+ * Any one instance is responsible only for building the profile for a specific [profile, entity]
+ * pairing.  There will exist many instances, one for each [profile, entity] pair that exists
+ * within the incoming telemetry data applied to the profile.
+ */
+public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The name of the profile.
+   */
+  private String profileName;
+
+  /**
+   * The name of the entity.
+   */
+  private String entity;
+
+  /**
+   * The definition of the Profile that the bolt is building.
+   */
+  private ProfileConfig definition;
+
+  /**
+   * Executes Stellar code and maintains state across multiple invocations.
+   */
+  private StellarStatefulExecutor executor;
+
+  /**
+   * Has the profile been initialized?
+   */
+  private boolean isInitialized;
+
+  /**
+   * The duration of each period in milliseconds.
+   */
+  private long periodDurationMillis;
+
+  /**
+   * A clock is used to tell time; imagine that.
+   */
+  private Clock clock;
+
+  /**
+   * Use the ProfileBuilder.Builder to create a new ProfileBuilder.
+   */
+  private DefaultProfileBuilder(ProfileConfig definition,
+                                String entity,
+                                Clock clock,
+                                long periodDurationMillis,
+                                Context stellarContext) {
+
+    this.isInitialized = false;
+    this.definition = definition;
+    this.profileName = definition.getProfile();
+    this.entity = entity;
+    this.clock = clock;
+    this.periodDurationMillis = periodDurationMillis;
+    this.executor = new DefaultStellarStatefulExecutor();
+    StellarFunctions.initialize(stellarContext);
+    this.executor.setContext(stellarContext);
+  }
+
+  /**
+   * Apply a message to the profile.
+   * @param message The message to apply.
+   */
+  @Override
+  @SuppressWarnings("unchecked")
+  public void apply(JSONObject message) {
+
+    if(!isInitialized()) {
+      assign(definition.getInit(), message, "init");
+      isInitialized = true;
+    }
+
+    assign(definition.getUpdate(), message, "update");
+  }
+
+  /**
+   * Flush the Profile.
+   *
+   * Completes and emits the ProfileMeasurement.  Clears all state in preparation for
+   * the next window period.
+   *
+   * @return Returns the completed profile measurement.
+   */
+  @Override
+  public ProfileMeasurement flush() {
+    LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity);
+
+    // execute the 'profile' expression(s)
+    @SuppressWarnings("unchecked")
+    Object profileValue = execute(definition.getResult().getProfileExpressions().getExpression(), "result/profile");
+
+    // execute the 'triage' expression(s)
+    Map<String, Object> triageValues = definition.getResult().getTriageExpressions().getExpressions()
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(
+                    e -> e.getKey(),
+                    e -> execute(e.getValue(), "result/triage")));
+
+    // execute the 'groupBy' expression(s) - can refer to value of 'result' expression
+    List<Object> groups = execute(definition.getGroupBy(), ImmutableMap.of("result", profileValue), "groupBy");
+
+    isInitialized = false;
+    return new ProfileMeasurement()
+            .withProfileName(profileName)
+            .withEntity(entity)
+            .withGroups(groups)
+            .withPeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS)
+            .withProfileValue(profileValue)
+            .withTriageValues(triageValues)
+            .withDefinition(definition);
+  }
+
+  /**
+   * Returns the current value of a variable.
+   * @param variable The name of the variable.
+   */
+  @Override
+  public Object valueOf(String variable) {
+    return executor.getState().get(variable);
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return isInitialized;
+  }
+
+  @Override
+  public ProfileConfig getDefinition() {
+    return definition;
+  }
+
+  /**
+   * Executes an expression contained within the profile definition.
+   * @param expression The expression to execute.
+   * @param transientState Additional transient state provided to the expression.
+   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
+   * @return The result of executing the expression.
+   */
+  private Object execute(String expression, Map<String, Object> transientState, String expressionType) {
+    Object result = null;
+
+    List<Object> allResults = execute(Collections.singletonList(expression), transientState, expressionType);
+    if(allResults.size() > 0) {
+      result = allResults.get(0);
+    }
+
+    return result;
+  }
+
+  /**
+   * Executes an expression contained within the profile definition.
+   * @param expression The expression to execute.
+   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
+   * @return The result of executing the expression.
+   */
+  private Object execute(String expression, String expressionType) {
+    return execute(expression, Collections.emptyMap(), expressionType);
+  }
+
+
+  /**
+   * Executes a set of expressions whose results need to be assigned to a variable.
+   * @param expressions Maps the name of a variable to the expression whose result should be assigned to it.
+   * @param transientState Additional transient state provided to the expression.
+   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
+   */
+  private void assign(Map<String, String> expressions, Map<String, Object> transientState, String expressionType) {
+    try {
+
+      // execute each of the 'update' expressions
+      MapUtils.emptyIfNull(expressions)
+              .forEach((var, expr) -> executor.assign(var, expr, transientState));
+
+    } catch(ParseException e) {
+
+      // make it brilliantly clear that one of the 'update' expressions is bad
+      String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity);
+      throw new ParseException(msg, e);
+    }
+  }
+
+  /**
+   * Executes the expressions contained within the profile definition.
+   * @param expressions A list of expressions to execute.
+   * @param transientState Additional transient state provided to the expressions.
+   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
+   * @return The result of executing each expression.
+   */
+  private List<Object> execute(List<String> expressions, Map<String, Object> transientState, String expressionType) {
+    List<Object> results = new ArrayList<>();
+
+    try {
+      ListUtils.emptyIfNull(expressions)
+              .forEach((expr) -> results.add(executor.execute(expr, transientState, Object.class)));
+
+    } catch (Throwable e) {
+      String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity);
+      throw new ParseException(msg, e);
+    }
+
+    return results;
+  }
+
+  /**
+   * A builder used to construct a new ProfileBuilder.
+   */
+  public static class Builder {
+
+    private ProfileConfig definition;
+    private String entity;
+    private long periodDurationMillis;
+    private Clock clock = new WallClock();
+    private Context context;
+
+    public Builder withContext(Context context) {
+      this.context = context;
+      return this;
+    }
+
+    public Builder withClock(Clock clock) {
+      this.clock = clock;
+      return this;
+    }
+
+    /**
+     * @param definition The profiler definition.
+     */
+    public Builder withDefinition(ProfileConfig definition) {
+      this.definition = definition;
+      return this;
+    }
+
+    /**
+     * @param entity The name of the entity
+     */
+    public Builder withEntity(String entity) {
+      this.entity = entity;
+      return this;
+    }
+
+    /**
+     * @param duration The duration of each profile period.
+     * @param units The units used to specify the duration of the profile period.
+     */
+    public Builder withPeriodDuration(long duration, TimeUnit units) {
+      this.periodDurationMillis = units.toMillis(duration);
+      return this;
+    }
+
+    /**
+     * @param millis The duration of each profile period in milliseconds.
+     */
+    public Builder withPeriodDurationMillis(long millis) {
+      this.periodDurationMillis = millis;
+      return this;
+    }
+
+    /**
+     * Construct a ProfileBuilder.
+     */
+    public ProfileBuilder build() {
+
+      if(definition == null) {
+        throw new IllegalArgumentException("missing profiler definition; got null");
+      }
+      if(StringUtils.isEmpty(entity)) {
+        throw new IllegalArgumentException(format("missing entity name; got '%s'", entity));
+      }
+
+      return new DefaultProfileBuilder(definition, entity, clock, periodDurationMillis, context);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
new file mode 100644
index 0000000..a60446f
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
@@ -0,0 +1,56 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Distributes a message along a MessageRoute.  A MessageRoute will lead to one or
+ * more ProfileBuilders.
+ *
+ * A ProfileBuilder is responsible for maintaining the state of a single profile,
+ * for a single entity.  There will be one ProfileBuilder for each (profile, entity) pair.
+ * This class ensures that each ProfileBuilder receives the telemetry messages that
+ * it needs.
+ */
+public interface MessageDistributor {
+
+  /**
+   * Distribute a message along a MessageRoute.
+   *
+   * @param message The message that needs distributed.
+   * @param route The message route.
+   * @param context The Stellar execution context.
+   * @throws ExecutionException
+   */
+  void distribute(JSONObject message, MessageRoute route, Context context) throws ExecutionException;
+
+  /**
+   * Flushes all profiles.  Flushes all ProfileBuilders that this distributor is responsible for.
+   *
+   * @return The profile measurements; one for each (profile, entity) pair.
+   */
+  List<ProfileMeasurement> flush();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
new file mode 100644
index 0000000..1945671
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+
+/**
+ * A MessageRoute defines the profile and entity that a telemetry message needs applied to.  This
+ * allows a message to be routed to the profile and entity that needs it.
+ *
+ * One telemetry message may need multiple routes.  This is the case when a message is needed by
+ * more than one profile.  In this case, there will be multiple MessageRoute objects for a single
+ * message.
+ */
+public class MessageRoute {
+
+  /**
+   * The definition of the profile on this route.
+   */
+  private ProfileConfig profileDefinition;
+
+  /**
+   * The entity for this route.
+   */
+  private String entity;
+
+  public MessageRoute(ProfileConfig profileDefinition, String entity) {
+    this.entity = entity;
+    this.profileDefinition = profileDefinition;
+  }
+
+  public String getEntity() {
+    return entity;
+  }
+
+  public void setEntity(String entity) {
+    this.entity = entity;
+  }
+
+  public ProfileConfig getProfileDefinition() {
+    return profileDefinition;
+  }
+
+  public void setProfileDefinition(ProfileConfig profileDefinition) {
+    this.profileDefinition = profileDefinition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java
new file mode 100644
index 0000000..99c98a3
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java
@@ -0,0 +1,46 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+
+import java.util.List;
+
+/**
+ * Routes incoming telemetry messages.
+ *
+ * A single telemetry message may need to take multiple routes.  This is the case
+ * when a message is needed by more than one profile.
+ */
+public interface MessageRouter {
+
+  /**
+   * Route a telemetry message.  Finds all routes for a given telemetry message.
+   *
+   * @param message The telemetry message that needs routed.
+   * @param config The configuration for the Profiler.
+   * @param context The Stellar execution context.
+   * @return A list of all the routes for the message.
+   */
+  List<MessageRoute> route(JSONObject message, ProfilerConfig config, Context context);
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
index da8db82..9a5407b 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
@@ -20,32 +20,8 @@
 
 package org.apache.metron.profiler;
 
-import static java.lang.String.format;
-
-import com.google.common.collect.ImmutableMap;
-import java.io.Serializable;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.commons.collections4.ListUtils;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.profiler.clock.Clock;
-import org.apache.metron.profiler.clock.WallClock;
-import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
-import org.apache.metron.stellar.common.StellarStatefulExecutor;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.ParseException;
-import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Responsible for building and maintaining a Profile.
@@ -57,85 +33,13 @@ import org.slf4j.LoggerFactory;
  * pairing.  There will exist many instances, one for each [profile, entity] pair that exists
  * within the incoming telemetry data applied to the profile.
  */
-public class ProfileBuilder implements Serializable {
-
-  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  /**
-   * The name of the profile.
-   */
-  private String profileName;
-
-  /**
-   * The name of the entity.
-   */
-  private String entity;
-
-  /**
-   * The definition of the Profile that the bolt is building.
-   */
-  private ProfileConfig definition;
-
-  /**
-   * Executes Stellar code and maintains state across multiple invocations.
-   */
-  private StellarStatefulExecutor executor;
-
-  /**
-   * Has the profile been initialized?
-   */
-  private boolean isInitialized;
-
-  /**
-   * The duration of each period in milliseconds.
-   */
-  private long periodDurationMillis;
-
-  /**
-   * A clock is used to tell time; imagine that.
-   */
-  private Clock clock;
-
-  /**
-   * Use the ProfileBuilder.Builder to create a new ProfileBuilder.
-   */
-  private ProfileBuilder(ProfileConfig definition,
-                         String entity,
-                         Clock clock,
-                         long periodDurationMillis,
-                         CuratorFramework client,
-                         Map<String, Object> global) {
-
-    this.isInitialized = false;
-    this.definition = definition;
-    this.profileName = definition.getProfile();
-    this.entity = entity;
-    this.clock = clock;
-    this.periodDurationMillis = periodDurationMillis;
-    this.executor = new DefaultStellarStatefulExecutor();
-    Context context = new Context.Builder()
-            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
-            .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
-            .with(Context.Capabilities.STELLAR_CONFIG, () -> global)
-            .build();
-    StellarFunctions.initialize(context);
-    this.executor.setContext(context);
-  }
+public interface ProfileBuilder {
 
   /**
    * Apply a message to the profile.
    * @param message The message to apply.
    */
-  @SuppressWarnings("unchecked")
-  public void apply(JSONObject message) {
-
-    if(!isInitialized()) {
-      assign(definition.getInit(), message, "init");
-      isInitialized = true;
-    }
-
-    assign(definition.getUpdate(), message, "update");
-  }
+  void apply(JSONObject message);
 
   /**
    * Flush the Profile.
@@ -145,203 +49,24 @@ public class ProfileBuilder implements Serializable {
    *
    * @return Returns the completed profile measurement.
    */
-  public ProfileMeasurement flush() {
-    LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity);
-
-    // execute the 'profile' expression(s)
-    @SuppressWarnings("unchecked")
-    Object profileValue = execute(definition.getResult().getProfileExpressions().getExpression(), "result/profile");
-
-    // execute the 'triage' expression(s)
-    Map<String, Object> triageValues = definition.getResult().getTriageExpressions().getExpressions()
-            .entrySet()
-            .stream()
-            .collect(Collectors.toMap(
-                    e -> e.getKey(),
-                    e -> execute(e.getValue(), "result/triage")));
-
-    // execute the 'groupBy' expression(s) - can refer to value of 'result' expression
-    List<Object> groups = execute(definition.getGroupBy(), ImmutableMap.of("result", profileValue), "groupBy");
-
-    isInitialized = false;
-    return new ProfileMeasurement()
-            .withProfileName(profileName)
-            .withEntity(entity)
-            .withGroups(groups)
-            .withPeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS)
-            .withProfileValue(profileValue)
-            .withTriageValues(triageValues)
-            .withDefinition(definition);
-  }
-
-  /**
-   * Executes an expression contained within the profile definition.
-   * @param expression The expression to execute.
-   * @param transientState Additional transient state provided to the expression.
-   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
-   * @return The result of executing the expression.
-   */
-  private Object execute(String expression, Map<String, Object> transientState, String expressionType) {
-    Object result = null;
-
-    List<Object> allResults = execute(Collections.singletonList(expression), transientState, expressionType);
-    if(allResults.size() > 0) {
-      result = allResults.get(0);
-    }
-
-    return result;
-  }
-
-  /**
-   * Executes an expression contained within the profile definition.
-   * @param expression The expression to execute.
-   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
-   * @return The result of executing the expression.
-   */
-  private Object execute(String expression, String expressionType) {
-    return execute(expression, Collections.emptyMap(), expressionType);
-  }
-
-
-  /**
-   * Executes a set of expressions whose results need to be assigned to a variable.
-   * @param expressions Maps the name of a variable to the expression whose result should be assigned to it.
-   * @param transientState Additional transient state provided to the expression.
-   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
-   */
-  private void assign(Map<String, String> expressions, Map<String, Object> transientState, String expressionType) {
-    try {
-
-      // execute each of the 'update' expressions
-      MapUtils.emptyIfNull(expressions)
-              .forEach((var, expr) -> executor.assign(var, expr, transientState));
-
-    } catch(ParseException e) {
-
-      // make it brilliantly clear that one of the 'update' expressions is bad
-      String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity);
-      throw new ParseException(msg, e);
-    }
-  }
+  ProfileMeasurement flush();
 
   /**
-   * Executes the expressions contained within the profile definition.
-   * @param expressions A list of expressions to execute.
-   * @param transientState Additional transient state provided to the expressions.
-   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
-   * @return The result of executing each expression.
+   * Has the ProfileBuilder been initialized?
+   * @return True, if initialization has occurred.  False, otherwise.
    */
-  private List<Object> execute(List<String> expressions, Map<String, Object> transientState, String expressionType) {
-    List<Object> results = new ArrayList<>();
-
-    try {
-      ListUtils.emptyIfNull(expressions)
-              .forEach((expr) -> results.add(executor.execute(expr, transientState, Object.class)));
-
-    } catch (Throwable e) {
-      String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity);
-      throw new ParseException(msg, e);
-    }
-
-    return results;
-  }
+  boolean isInitialized();
 
   /**
-   * Returns the current value of a variable.
-   * @param variable The name of the variable.
+   * Returns the definition of the profile being built.
+   * @return
    */
-  public Object valueOf(String variable) {
-    return executor.getState().get(variable);
-  }
-
-  public boolean isInitialized() {
-    return isInitialized;
-  }
-
-  public ProfileConfig getDefinition() {
-    return definition;
-  }
+  ProfileConfig getDefinition();
 
   /**
-   * A builder used to construct a new ProfileBuilder.
+   * Returns the value of a variable being maintained by the builder.
+   * @param variable The variable name.
+   * @return The value of the variable.
    */
-  public static class Builder {
-
-    private ProfileConfig definition;
-    private String entity;
-    private long periodDurationMillis;
-    private CuratorFramework zookeeperClient;
-    private Map<String, Object> global;
-    private Clock clock = new WallClock();
-
-    public Builder withClock(Clock clock) {
-      this.clock = clock;
-      return this;
-    }
-
-    /**
-     * @param definition The profiler definition.
-     */
-    public Builder withDefinition(ProfileConfig definition) {
-      this.definition = definition;
-      return this;
-    }
-
-    /**
-     * @param entity The name of the entity
-     */
-    public Builder withEntity(String entity) {
-      this.entity = entity;
-      return this;
-    }
-
-    /**
-     * @param duration The duration of each profile period.
-     * @param units The units used to specify the duration of the profile period.
-     */
-    public Builder withPeriodDuration(long duration, TimeUnit units) {
-      this.periodDurationMillis = units.toMillis(duration);
-      return this;
-    }
-
-    /**
-     * @param millis The duration of each profile period in milliseconds.
-     */
-    public Builder withPeriodDurationMillis(long millis) {
-      this.periodDurationMillis = millis;
-      return this;
-    }
-
-    /**
-     * @param zookeeperClient The zookeeper client.
-     */
-    public Builder withZookeeperClient(CuratorFramework zookeeperClient) {
-      this.zookeeperClient = zookeeperClient;
-      return this;
-    }
-
-    /**
-     * @param global The global configuration.
-     */
-    public Builder withGlobalConfiguration(Map<String, Object> global) {
-      // TODO how does the profile builder ever seen a global that has been update in zookeeper?
-      this.global = global;
-      return this;
-    }
-
-    /**
-     * Construct a ProfileBuilder.
-     */
-    public ProfileBuilder build() {
-
-      if(definition == null) {
-        throw new IllegalArgumentException("missing profiler definition; got null");
-      }
-      if(StringUtils.isEmpty(entity)) {
-        throw new IllegalArgumentException(format("missing entity name; got '%s'", entity));
-      }
-
-      return new ProfileBuilder(definition, entity, clock, periodDurationMillis, zookeeperClient, global);
-    }
-  }
+  Object valueOf(String variable);
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
new file mode 100644
index 0000000..cf034c8
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
@@ -0,0 +1,88 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A stand alone version of the Profiler that does not require a
+ * distributed execution environment like Apache Storm.
+ */
+public class StandAloneProfiler {
+
+  /**
+   * The Stellar execution context.
+   */
+  private Context context;
+
+  /**
+   * The configuration for the Profiler.
+   */
+  private ProfilerConfig config;
+
+  /**
+   * The message router.
+   */
+  private MessageRouter router;
+
+  /**
+   * The message distributor.
+   */
+  private MessageDistributor distributor;
+
+  public StandAloneProfiler(ProfilerConfig config, long periodDurationMillis, Context context) {
+    this.context = context;
+    this.config = config;
+    this.router = new DefaultMessageRouter(context);
+    // the period TTL does not matter in this context
+    this.distributor = new DefaultMessageDistributor(periodDurationMillis, Long.MAX_VALUE);
+  }
+
+  /**
+   * Apply a message to a set of profiles.
+   * @param message The message to apply.
+   * @throws ExecutionException
+   */
+  public void apply(JSONObject message) throws ExecutionException {
+
+    List<MessageRoute> routes = router.route(message, config, context);
+    for(MessageRoute route : routes) {
+      distributor.distribute(message, route, context);
+    }
+  }
+
+  /**
+   * Flush the set of profiles.
+   * @return A ProfileMeasurement for each (Profile, Entity) pair.
+   */
+  public List<ProfileMeasurement> flush() {
+    return distributor.flush();
+  }
+
+  public ProfilerConfig getConfig() {
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
new file mode 100644
index 0000000..ff4c289
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
@@ -0,0 +1,157 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class DefaultMessageDistributorTest {
+
+  /**
+   * {
+   *   "ip_src_addr": "10.0.0.1",
+   *   "value": "22"
+   * }
+   */
+  @Multiline
+  private String inputOne;
+  private JSONObject messageOne;
+
+  /**
+   * {
+   *   "ip_src_addr": "10.0.0.2",
+   *   "value": "22"
+   * }
+   */
+  @Multiline
+  private String inputTwo;
+  private JSONObject messageTwo;
+
+  /**
+   * {
+   *   "profile": "profile-one",
+   *   "foreach": "ip_src_addr",
+   *   "init":   { "x": "0" },
+   *   "update": { "x": "x + 1" },
+   *   "result": "x"
+   * }
+   */
+  @Multiline
+  private String profileOne;
+
+  /**
+   * {
+   *   "profile": "profile-two",
+   *   "foreach": "ip_src_addr",
+   *   "init":   { "x": "0" },
+   *   "update": { "x": "x + 1" },
+   *   "result": "x"
+   * }
+   */
+  @Multiline
+  private String profileTwo;
+
+  private DefaultMessageDistributor distributor;
+  private Context context;
+
+  @Before
+  public void setup() throws Exception {
+    context = Context.EMPTY_CONTEXT();
+    JSONParser parser = new JSONParser();
+    messageOne = (JSONObject) parser.parse(inputOne);
+    messageTwo = (JSONObject) parser.parse(inputTwo);
+    distributor = new DefaultMessageDistributor(
+            TimeUnit.MINUTES.toMillis(15),
+            TimeUnit.MINUTES.toMillis(30));
+  }
+
+  /**
+   * Creates a profile definition based on a string of JSON.
+   * @param json The string of JSON.
+   */
+  private ProfileConfig createDefinition(String json) throws IOException {
+    return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
+  }
+
+  /**
+   * Tests that one message can be distributed to one profile.
+   */
+  @Test
+  public void testDistribute() throws Exception {
+    ProfileConfig definition = createDefinition(profileOne);
+    String entity = (String) messageOne.get("ip_src_addr");
+    MessageRoute route = new MessageRoute(definition, entity);
+
+    // distribute one message
+    distributor.distribute(messageOne, route, context);
+
+    // expect one measurement coming from one profile
+    List<ProfileMeasurement> measurements = distributor.flush();
+    assertEquals(1, measurements.size());
+    ProfileMeasurement m = measurements.get(0);
+    assertEquals(definition.getProfile(), m.getProfileName());
+    assertEquals(entity, m.getEntity());
+  }
+
+  @Test
+  public void testDistributeWithTwoProfiles() throws Exception {
+
+    // distribute one message to the first profile
+    String entity = (String) messageOne.get("ip_src_addr");
+    distributor.distribute(messageOne, new MessageRoute(createDefinition(profileOne), entity), context);
+
+    // distribute another message to the second profile, but same entity
+    distributor.distribute(messageOne, new MessageRoute(createDefinition(profileTwo), entity), context);
+
+    // expect 2 measurements; 1 for each profile
+    List<ProfileMeasurement> measurements = distributor.flush();
+    assertEquals(2, measurements.size());
+  }
+
+  @Test
+  public void testDistributeWithTwoEntities() throws Exception {
+
+    // distribute one message
+    String entityOne = (String) messageOne.get("ip_src_addr");
+    distributor.distribute(messageOne, new MessageRoute(createDefinition(profileOne), entityOne), context);
+
+    // distribute another message with a different entity
+    String entityTwo = (String) messageTwo.get("ip_src_addr");
+    distributor.distribute(messageTwo, new MessageRoute(createDefinition(profileTwo), entityTwo), context);
+
+    // expect 2 measurements; 1 for each entity
+    List<ProfileMeasurement> measurements = distributor.flush();
+    assertEquals(2, measurements.size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java
new file mode 100644
index 0000000..d0c0fbc
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java
@@ -0,0 +1,183 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class DefaultMessageRouterTest {
+
+  /**
+   * {
+   *   "ip_src_addr": "10.0.0.1",
+   *   "value": "22"
+   * }
+   */
+  @Multiline
+  private String inputOne;
+  private JSONObject messageOne;
+
+  /**
+   * {
+   *   "ip_src_addr": "10.0.0.2",
+   *   "value": "22"
+   * }
+   */
+  @Multiline
+  private String inputTwo;
+  private JSONObject messageTwo;
+
+  /**
+   * {
+   *   "profiles": [ ]
+   * }
+   */
+  @Multiline
+  private String noProfiles;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile-one",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "x": "0" },
+   *        "update": { "x": "x + 1" },
+   *        "result": "x"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String oneProfile;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile-one",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "x": "0" },
+   *        "update": { "x": "x + 1" },
+   *        "result": "x"
+   *      },
+   *      {
+   *        "profile": "profile-two",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "x": "0" },
+   *        "update": { "x": "x + 1" },
+   *        "result": "x"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String twoProfiles;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile-one",
+   *        "onlyif": "false",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "x": "0" },
+   *        "update": { "x": "x + 1" },
+   *        "result": "x"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String exclusiveProfile;
+
+  private DefaultMessageRouter router;
+  private Context context;
+
+  /**
+   * Creates a profile definition based on a string of JSON.
+   * @param json The string of JSON.
+   */
+  private ProfilerConfig createConfig(String json) throws IOException {
+    return JSONUtils.INSTANCE.load(json, ProfilerConfig.class);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    this.router = new DefaultMessageRouter(Context.EMPTY_CONTEXT());
+    this.context = Context.EMPTY_CONTEXT();
+    JSONParser parser = new JSONParser();
+    this.messageOne = (JSONObject) parser.parse(inputOne);
+    this.messageTwo = (JSONObject) parser.parse(inputTwo);
+  }
+
+  @Test
+  public void testWithOneRoute() throws Exception {
+    List<MessageRoute> routes = router.route(messageOne, createConfig(oneProfile), context);
+
+    assertEquals(1, routes.size());
+    MessageRoute route1 = routes.get(0);
+    assertEquals(messageOne.get("ip_src_addr"), route1.getEntity());
+    assertEquals("profile-one", route1.getProfileDefinition().getProfile());
+  }
+
+  @Test
+  public void testWithNoRoutes() throws Exception {
+    List<MessageRoute> routes = router.route(messageOne, createConfig(noProfiles), context);
+    assertEquals(0, routes.size());
+  }
+
+  @Test
+  public void testWithTwoRoutes() throws Exception {
+    List<MessageRoute> routes = router.route(messageOne, createConfig(twoProfiles), context);
+
+    assertEquals(2, routes.size());
+    {
+      MessageRoute route1 = routes.get(0);
+      assertEquals(messageOne.get("ip_src_addr"), route1.getEntity());
+      assertEquals("profile-one", route1.getProfileDefinition().getProfile());
+    }
+    {
+      MessageRoute route2 = routes.get(1);
+      assertEquals(messageOne.get("ip_src_addr"), route2.getEntity());
+      assertEquals("profile-two", route2.getProfileDefinition().getProfile());
+    }
+  }
+
+  /**
+   * The 'onlyif' condition should exclude some messages from being routed to a profile.
+   */
+  @Test
+  public void testExclusiveProfile() throws Exception {
+    List<MessageRoute> routes = router.route(messageOne, createConfig(exclusiveProfile), context);
+    assertEquals(0, routes.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
new file mode 100644
index 0000000..fa5c19c
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
@@ -0,0 +1,472 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.clock.FixedClock;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.stellar.common.utils.ConversionUtils.convert;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the ProfileBuilder class.
+ */
+public class DefaultProfileBuilderTest {
+
+  /**
+   * {
+   *   "ip_src_addr": "10.0.0.1",
+   *   "ip_dst_addr": "10.0.0.20",
+   *   "value": 100
+   * }
+   */
+  @Multiline
+  private String input;
+  private JSONObject message;
+  private ProfileBuilder builder;
+  private ProfileConfig definition;
+
+  @Before
+  public void setup() throws Exception {
+    message = (JSONObject) new JSONParser().parse(input);
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *     "x": "100",
+   *     "y": "200"
+   *   },
+   *   "result": "x + y"
+   * }
+   */
+  @Multiline
+  private String testInitProfile;
+
+  /**
+   * Ensure that the 'init' block is executed correctly.
+   */
+  @Test
+  public void testInit() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate that x = 100, y = 200
+    assertEquals(100 + 200, (int) convert(m.getProfileValue(), Integer.class));
+  }
+
+  /**
+   * The 'init' block is executed only when the first message is received.  If no message
+   * has been received, the 'init' block will not be executed.
+   */
+  @Test
+  public void testInitWithNoMessage() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // execute
+    ProfileMeasurement m = builder.flush();
+
+    // validate that x = 0 and y = 0 as no initialization occurred
+    assertEquals(0, (int) convert(m.getProfileValue(), Integer.class));
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *     "x": "0",
+   *     "y": "0"
+   *   },
+   *   "update": {
+   *     "x": "x + 1",
+   *     "y": "y + 2"
+   *   },
+   *   "result": "x + y"
+   * }
+   */
+  @Multiline
+  private String testUpdateProfile;
+
+  /**
+   * Ensure that the 'update' expressions are executed for each message applied to the profile.
+   */
+  @Test
+  public void testUpdate() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testUpdateProfile, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // execute
+    int count = 10;
+    for(int i=0; i<count; i++) {
+      builder.apply(message);
+    }
+    ProfileMeasurement m = builder.flush();
+
+    // validate that x=0, y=0 then x+=1, y+=2 for each message
+    assertEquals(count*1 + count*2, (int) convert(m.getProfileValue(), Integer.class));
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": { "x": "100" },
+   *   "result": "x"
+   * }
+   */
+  @Multiline
+  private String testResultProfile;
+
+  /**
+   * Ensure that the result expression is executed on a flush.
+   */
+  @Test
+  public void testResult() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(100, (int) convert(m.getProfileValue(), Integer.class));
+  }
+
+  /**
+   * Ensure that time advances properly on each flush.
+   */
+  @Test
+  public void testProfilePeriodOnFlush() throws Exception {
+    // setup
+    FixedClock clock = new FixedClock();
+    clock.setTime(100);
+
+    definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .withClock(clock)
+            .build();
+
+    {
+      // apply a message and flush
+      builder.apply(message);
+      ProfileMeasurement m = builder.flush();
+
+      // validate the profile period
+      ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
+      assertEquals(expected, m.getPeriod());
+    }
+    {
+      // advance time by at least one period - 10 minutes
+      clock.setTime(clock.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10));
+
+      // apply a message and flush again
+      builder.apply(message);
+      ProfileMeasurement m = builder.flush();
+
+      // validate the profile period
+      ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
+      assertEquals(expected, m.getPeriod());
+    }
+  }
+
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": { "x": "100" },
+   *   "groupBy": ["x * 1", "x * 2"],
+   *   "result": "100.0"
+   * }
+   */
+  @Multiline
+  private String testGroupByProfile;
+
+  /**
+   * Ensure that the 'groupBy' expression is executed correctly.
+   */
+  @Test
+  public void testGroupBy() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testGroupByProfile, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(2, m.getGroups().size());
+    assertEquals(100, m.getGroups().get(0));
+    assertEquals(200, m.getGroups().get(1));
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *     "x": "if exists(x) then x else 0",
+   *     "y": "if exists(y) then y else 0"
+   *   },
+   *   "update": {
+   *     "x": "x + 1",
+   *     "y": "y + 2"
+   *   },
+   *   "result": "x + y"
+   * }
+   */
+  @Multiline
+  private String testFlushProfile;
+
+  @Test
+  public void testFlushDoesNotClearsState() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // execute - accumulate some state then flush it
+    int count = 10;
+    for(int i=0; i<count; i++) {
+      builder.apply(message);
+    }
+    builder.flush();
+
+    // apply another message to accumulate new state, then flush again to validate original state was cleared
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(33, m.getProfileValue());
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *     "x": "0",
+   *     "y": "0"
+   *   },
+   *   "update": {
+   *     "x": "x + 1",
+   *     "y": "y + 2"
+   *   },
+   *   "result": "x + y"
+   * }
+   */
+  @Multiline
+  private String testFlushProfileWithNaiveInit;
+
+  @Test
+  public void testFlushDoesNotClearsStateButInitDoes() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testFlushProfileWithNaiveInit, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // execute - accumulate some state then flush it
+    int count = 10;
+    for(int i=0; i<count; i++) {
+      builder.apply(message);
+    }
+    builder.flush();
+
+    // apply another message to accumulate new state, then flush again to validate original state was cleared
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(3, m.getProfileValue());
+  }
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "result": "100"
+   * }
+   */
+  @Multiline
+  private String testEntityProfile;
+
+  /**
+   * Ensure that the entity is correctly set on the resulting profile measurements.
+   */
+  @Test
+  public void testEntity() throws Exception {
+    // setup
+    final String entity = "10.0.0.1";
+    definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity(entity)
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(entity, m.getEntity());
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *      "x": "100"
+   *   },
+   *   "result": {
+   *      "profile": "x"
+   *   }
+   * }
+   */
+  @Multiline
+  private String testResultWithProfileExpression;
+
+  /**
+   * Ensure that the result expression is executed on a flush.
+   */
+  @Test
+  public void testResultWithProfileExpression() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testResultWithProfileExpression, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(100, m.getProfileValue());
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *      "x": "100"
+   *   },
+   *   "result": {
+   *      "profile": "x",
+   *      "triage": {
+   *        "zero": "x - 100",
+   *        "hundred": "x"
+   *      }
+   *   }
+   * }
+   */
+  @Multiline
+  private String testResultWithTriageExpression;
+
+  /**
+   * Ensure that the result expression is executed on a flush.
+   */
+  @Test
+  public void testResultWithTriageExpression() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testResultWithTriageExpression, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(0, m.getTriageValues().get("zero"));
+    assertEquals(100, m.getTriageValues().get("hundred"));
+    assertEquals(100, m.getProfileValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
deleted file mode 100644
index aa632e4..0000000
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.profiler.clock.FixedClock;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.metron.stellar.common.utils.ConversionUtils.convert;
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the ProfileBuilder class.
- */
-public class ProfileBuilderTest {
-
-  /**
-   * {
-   *   "ip_src_addr": "10.0.0.1",
-   *   "ip_dst_addr": "10.0.0.20",
-   *   "value": 100
-   * }
-   */
-  @Multiline
-  private String input;
-  private JSONObject message;
-  private ProfileBuilder builder;
-  private ProfileConfig definition;
-
-  @Before
-  public void setup() throws Exception {
-    message = (JSONObject) new JSONParser().parse(input);
-  }
-
-  /**
-   * {
-   *   "profile": "test",
-   *   "foreach": "ip_src_addr",
-   *   "init": {
-   *     "x": "100",
-   *     "y": "200"
-   *   },
-   *   "result": "x + y"
-   * }
-   */
-  @Multiline
-  private String testInitProfile;
-
-  /**
-   * Ensure that the 'init' block is executed correctly.
-   */
-  @Test
-  public void testInit() throws Exception {
-    // setup
-    definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
-    builder = new ProfileBuilder.Builder()
-            .withDefinition(definition)
-            .withEntity("10.0.0.1")
-            .withPeriodDuration(10, TimeUnit.MINUTES)
-            .build();
-
-    // execute
-    builder.apply(message);
-    ProfileMeasurement m = builder.flush();
-
-    // validate that x = 100, y = 200
-    assertEquals(100 + 200, (int) convert(m.getProfileValue(), Integer.class));
-  }
-
-  /**
-   * The 'init' block is executed only when the first message is received.  If no message
-   * has been received, the 'init' block will not be executed.
-   */
-  @Test
-  public void testInitWithNoMessage() throws Exception {
-    // setup
-    definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
-    builder = new ProfileBuilder.Builder()
-            .withDefinition(definition)
-            .withEntity("10.0.0.1")
-            .withPeriodDuration(10, TimeUnit.MINUTES)
-            .build();
-
-    // execute
-    ProfileMeasurement m = builder.flush();
-
-    // validate that x = 0 and y = 0 as no initialization occurred
-    assertEquals(0, (int) convert(m.getProfileValue(), Integer.class));
-  }
-
-  /**
-   * {
-   *   "profile": "test",
-   *   "foreach": "ip_src_addr",
-   *   "init": {
-   *     "x": "0",
-   *     "y": "0"
-   *   },
-   *   "update": {
-   *     "x": "x + 1",
-   *     "y": "y + 2"
-   *   },
-   *   "result": "x + y"
-   * }
-   */
-  @Multiline
-  private String testUpdateProfile;
-
-  /**
-   * Ensure that the 'update' expressions are executed for each message applied to the profile.
-   */
-  @Test
-  public void testUpdate() throws Exception {
-    // setup
-    definition = JSONUtils.INSTANCE.load(testUpdateProfile, ProfileConfig.class);
-    builder = new ProfileBuilder.Builder()
-            .withDefinition(definition)
-            .withEntity("10.0.0.1")
-            .withPeriodDuration(10, TimeUnit.MINUTES)
-            .build();
-
-    // execute
-    int count = 10;
-    for(int i=0; i<count; i++) {
-      builder.apply(message);
-    }
-    ProfileMeasurement m = builder.flush();
-
-    // validate that x=0, y=0 then x+=1, y+=2 for each message
-    assertEquals(count*1 + count*2, (int) convert(m.getProfileValue(), Integer.class));
-  }
-
-  /**
-   * {
-   *   "profile": "test",
-   *   "foreach": "ip_src_addr",
-   *   "init": { "x": "100" },
-   *   "result": "x"
-   * }
-   */
-  @Multiline
-  private String testResultProfile;
-
-  /**
-   * Ensure that the result expression is executed on a flush.
-   */
-  @Test
-  public void testResult() throws Exception {
-    // setup
-    definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
-    builder = new ProfileBuilder.Builder()
-            .withDefinition(definition)
-            .withEntity("10.0.0.1")
-            .withPeriodDuration(10, TimeUnit.MINUTES)
-            .build();
-
-    // execute
-    builder.apply(message);
-    ProfileMeasurement m = builder.flush();
-
-    // validate
-    assertEquals(100, (int) convert(m.getProfileValue(), Integer.class));
-  }
-
-  /**
-   * Ensure that time advances properly on each flush.
-   */
-  @Test
-  public void testProfilePeriodOnFlush() throws Exception {
-    // setup
-    FixedClock clock = new FixedClock();
-    clock.setTime(100);
-
-    definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
-    builder = new ProfileBuilder.Builder()
-            .withDefinition(definition)
-            .withEntity("10.0.0.1")
-            .withPeriodDuration(10, TimeUnit.MINUTES)
-            .withClock(clock)
-            .build();
-
-    {
-      // apply a message and flush
-      builder.apply(message);
-      ProfileMeasurement m = builder.flush();
-
-      // validate the profile period
-      ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
-      assertEquals(expected, m.getPeriod());
-    }
-    {
-      // advance time by at least one period - 10 minutes
-      clock.setTime(clock.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10));
-
-      // apply a message and flush again
-      builder.apply(message);
-      ProfileMeasurement m = builder.flush();
-
-      // validate the profile period
-      ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
-      assertEquals(expected, m.getPeriod());
-    }
-  }
-
-
-  /**
-   * {
-   *   "profile": "test",
-   *   "foreach": "ip_src_addr",
-   *   "init": { "x": "100" },
-   *   "groupBy": ["x * 1", "x * 2"],
-   *   "result": "100.0"
-   * }
-   */
-  @Multiline
-  private String testGroupByProfile;
-
-  /**
-   * Ensure that the 'groupBy' expression is executed correctly.
-   */
-  @Test
-  public void testGroupBy() throws Exception {
-    // setup
-    definition = JSONUtils.INSTANCE.load(testGroupByProfile, ProfileConfig.class);
-    builder = new ProfileBuilder.Builder()
-            .withDefinition(definition)
-            .withEntity("10.0.0.1")
-            .withPeriodDuration(10, TimeUnit.MINUTES)
-            .build();
-
-    // execute
-    builder.apply(message);
-    ProfileMeasurement m = builder.flush();
-
-    // validate
-    assertEquals(2, m.getGroups().size());
-    assertEquals(100, m.getGroups().get(0));
-    assertEquals(200, m.getGroups().get(1));
-  }
-
-  /**
-   * {
-   *   "profile": "test",
-   *   "foreach": "ip_src_addr",
-   *   "init": {
-   *     "x": "if exists(x) then x else 0",
-   *     "y": "if exists(y) then y else 0"
-   *   },
-   *   "update": {
-   *     "x": "x + 1",
-   *     "y": "y + 2"
-   *   },
-   *   "result": "x + y"
-   * }
-   */
-  @Multiline
-  private String testFlushProfile;
-
-  @Test
-  public void testFlushDoesNotClearsState() throws Exception {
-    // setup
-    definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
-    builder = new ProfileBuilder.Builder()
-            .withDefinition(definition)
-            .withEntity("10.0.0.1")
-            .withPeriodDuration(10, TimeUnit.MINUTES)
-            .build();
-
-    // execute - accumulate some state then flush it
-    int count = 10;
-    for(int i=0; i<count; i++) {
-      builder.apply(message);
-    }
-    builder.flush();
-
-    // apply another message to accumulate new state, then flush again to validate original state was cleared
-    builder.apply(message);
-    ProfileMeasurement m = builder.flush();
-
-    // validate
-    assertEquals(33, m.getProfileValue());
-  }
-
-  /**
-   * {
-   *   "profile": "test",
-   *   "foreach": "ip_src_addr",
-   *   "init": {
-   *     "x": "0",
-   *     "y": "0"
-   *   },
-   *   "update": {
-   *     "x": "x + 1",
-   *     "y": "y + 2"
-   *   },
-   *   "result": "x + y"
-   * }
-   */
-  @Multiline
-  private String testFlushProfileWithNaiveInit;
-
-  @Test
-  public void testFlushDoesNotClearsStateButInitDoes() throws Exception {
-    // setup
-    definition = JSONUtils.INSTANCE.load(testFlushProfileWithNaiveInit, ProfileConfig.class);
-    builder = new ProfileBuilder.Builder()
-            .withDefinition(definition)
-            .withEntity("10.0.0.1")
-            .withPeriodDuration(10, TimeUnit.MINUTES)
-            .build();
-
-    // execute - accumulate some state then flush it
-    int count = 10;
-    for(int i=0; i<count; i++) {
-      builder.apply(message);
-    }
-    builder.flush();
-
-    // apply another message to accumulate new state, then flush again to validate original state was cleared
-    builder.apply(message);
-    ProfileMeasurement m = builder.flush();
-
-    // validate
-    assertEquals(3, m.getProfileValue());
-  }
-  /**
-   * {
-   *   "profile": "test",
-   *   "foreach": "ip_src_addr",
-   *   "result": "100"
-   * }
-   */
-  @Multiline
-  private String testEntityProfile;
-
-  /**
-   * Ensure that the entity is correctly set on the resulting profile measurements.
-   */
-  @Test
-  public void testEntity() throws Exception {
-    // setup
-    final String entity = "10.0.0.1";
-    definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
-    builder = new ProfileBuilder.Builder()
-            .withDefinition(definition)
-            .withEntity(entity)
-            .withPeriodDuration(10, TimeUnit.MINUTES)
-            .build();
-
-    // execute
-    builder.apply(message);
-    ProfileMeasurement m = builder.flush();
-
-    // validate
-    assertEquals(entity, m.getEntity());
-  }
-
-  /**
-   * {
-   *   "profile": "test",
-   *   "foreach": "ip_src_addr",
-   *   "init": {
-   *      "x": "100"
-   *   },
-   *   "result": {
-   *      "profile": "x"
-   *   }
-   * }
-   */
-  @Multiline
-  private String testResultWithProfileExpression;
-
-  /**
-   * Ensure that the result expression is executed on a flush.
-   */
-  @Test
-  public void testResultWithProfileExpression() throws Exception {
-    // setup
-    definition = JSONUtils.INSTANCE.load(testResultWithProfileExpression, ProfileConfig.class);
-    builder = new ProfileBuilder.Builder()
-            .withDefinition(definition)
-            .withEntity("10.0.0.1")
-            .withPeriodDuration(10, TimeUnit.MINUTES)
-            .build();
-
-    // execute
-    builder.apply(message);
-    ProfileMeasurement m = builder.flush();
-
-    // validate
-    assertEquals(100, m.getProfileValue());
-  }
-
-  /**
-   * {
-   *   "profile": "test",
-   *   "foreach": "ip_src_addr",
-   *   "init": {
-   *      "x": "100"
-   *   },
-   *   "result": {
-   *      "profile": "x",
-   *      "triage": {
-   *        "zero": "x - 100",
-   *        "hundred": "x"
-   *      }
-   *   }
-   * }
-   */
-  @Multiline
-  private String testResultWithTriageExpression;
-
-  /**
-   * Ensure that the result expression is executed on a flush.
-   */
-  @Test
-  public void testResultWithTriageExpression() throws Exception {
-    // setup
-    definition = JSONUtils.INSTANCE.load(testResultWithTriageExpression, ProfileConfig.class);
-    builder = new ProfileBuilder.Builder()
-            .withDefinition(definition)
-            .withEntity("10.0.0.1")
-            .withPeriodDuration(10, TimeUnit.MINUTES)
-            .build();
-
-    // execute
-    builder.apply(message);
-    ProfileMeasurement m = builder.flush();
-
-    // validate
-    assertEquals(0, m.getTriageValues().get("zero"));
-    assertEquals(100, m.getTriageValues().get("hundred"));
-    assertEquals(100, m.getProfileValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 1323089..3c8d875 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -20,22 +20,13 @@
 
 package org.apache.metron.profiler.bolt;
 
-import static java.lang.String.format;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import org.apache.metron.common.bolt.ConfiguredProfilerBolt;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
-import org.apache.metron.profiler.ProfileBuilder;
+import org.apache.metron.profiler.DefaultMessageDistributor;
+import org.apache.metron.profiler.MessageRoute;
 import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.clock.WallClock;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.dsl.Context;
 import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -47,6 +38,15 @@ import org.json.simple.parser.JSONParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+
 /**
  * A bolt that is responsible for building a Profile.
  *
@@ -75,9 +75,9 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   private long profileTimeToLiveMillis;
 
   /**
-   * Maintains the state of a profile which is unique to a profile/entity pair.
+   * Distributes messages to the profile builders.
    */
-  private transient Cache<String, ProfileBuilder> profileCache;
+  private DefaultMessageDistributor messageDistributor;
 
   /**
    * Parses JSON messages.
@@ -122,10 +122,7 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     }
     this.collector = collector;
     this.parser = new JSONParser();
-    this.profileCache = CacheBuilder
-            .newBuilder()
-            .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
-            .build();
+    this.messageDistributor = new DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis);
   }
 
   @Override
@@ -138,6 +135,15 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     destinationHandlers.forEach(dest -> dest.declareOutputFields(declarer));
   }
 
+  private Context getStellarContext() {
+    Map<String, Object> global = getConfigurations().getGlobalConfig();
+    return new Context.Builder()
+            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+            .with(Context.Capabilities.STELLAR_CONFIG, () -> global)
+            .build();
+  }
+
   /**
    * Expect to receive either a tick tuple or a telemetry message that needs applied
    * to a profile.
@@ -148,7 +154,6 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     try {
       if(TupleUtils.isTick(input)) {
         handleTick();
-        profileCache.cleanUp();
 
       } else {
         handleMessage(input);
@@ -169,51 +174,23 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
    */
   private void handleMessage(Tuple input) throws ExecutionException {
     JSONObject message = getField("message", input, JSONObject.class);
-    getBuilder(input).apply(message);
+    ProfileConfig definition = getField("profile", input, ProfileConfig.class);
+    String entity = getField("entity", input, String.class);
+    MessageRoute route = new MessageRoute(definition, entity);
+
+    messageDistributor.distribute(message, route, getStellarContext());
   }
 
   /**
    * Handles a tick tuple.
    */
   private void handleTick() {
-    profileCache.asMap().forEach((key, profileBuilder) -> {
-      if(profileBuilder.isInitialized()) {
-
-        // flush the profile
-        ProfileMeasurement measurement = profileBuilder.flush();
-
-        // forward the measurement to each destination handler
-        destinationHandlers.forEach(handler -> handler.emit(measurement, collector));
-      }
-    });
-  }
+    List<ProfileMeasurement> measurements = messageDistributor.flush();
 
-  /**
-   * Builds the key that is used to lookup the ProfileState within the cache.
-   * @param tuple A tuple.
-   */
-  private String cacheKey(Tuple tuple) {
-    return format("%s:%s",
-            getField("profile", tuple, ProfileConfig.class),
-            getField("entity", tuple, String.class));
-  }
-
-  /**
-   * Retrieves the cached ProfileBuilder that is used to build and maintain the Profile.  If none exists,
-   * one will be created and returned.
-   * @param tuple The tuple.
-   */
-  protected ProfileBuilder getBuilder(Tuple tuple) throws ExecutionException {
-    return profileCache.get(
-            cacheKey(tuple),
-            () -> new ProfileBuilder.Builder()
-                    .withDefinition(getField("profile", tuple, ProfileConfig.class))
-                    .withEntity(getField("entity", tuple, String.class))
-                    .withPeriodDurationMillis(periodDurationMillis)
-                    .withGlobalConfiguration(getConfigurations().getGlobalConfig())
-                    .withZookeeperClient(client)
-                    .withClock(new WallClock())
-                    .build());
+    // forward the measurements to each destination handler
+    for(ProfileMeasurement m : measurements ) {
+      destinationHandlers.forEach(handler -> handler.emit(m, collector));
+    }
   }
 
   /**
@@ -255,4 +232,8 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     this.destinationHandlers.add(handler);
     return this;
   }
+
+  public DefaultMessageDistributor getMessageDistributor() {
+    return messageDistributor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index 255069a..a453c66 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -20,16 +20,12 @@
 
 package org.apache.metron.profiler.bolt;
 
-import java.io.UnsupportedEncodingException;
-import java.lang.invoke.MethodHandles;
-import java.util.Map;
 import org.apache.metron.common.bolt.ConfiguredProfilerBolt;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
-import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
-import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.profiler.MessageRouter;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.DefaultMessageRouter;
 import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -42,6 +38,11 @@ import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.UnsupportedEncodingException;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Map;
+
 /**
  * The bolt responsible for filtering incoming messages and directing
  * each to the one or more bolts responsible for building a Profile.  Each
@@ -59,9 +60,9 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
   private transient JSONParser parser;
 
   /**
-   * Executes Stellar code.
+   * The router responsible for routing incoming messages.
    */
-  private StellarStatefulExecutor executor;
+  private MessageRouter router;
 
   /**
    * @param zookeeperUrl The Zookeeper URL that contains the configuration for this bolt.
@@ -75,18 +76,16 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
     super.prepare(stormConf, context, collector);
     this.collector = collector;
     this.parser = new JSONParser();
-    this.executor = new DefaultStellarStatefulExecutor();
-    initializeStellar();
+    this.router = new DefaultMessageRouter(getStellarContext());
   }
 
-  protected void initializeStellar() {
-    Context context = new Context.Builder()
+  private Context getStellarContext() {
+    Map<String, Object> global = getConfigurations().getGlobalConfig();
+    return new Context.Builder()
             .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
-            .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
-            .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+            .with(Context.Capabilities.STELLAR_CONFIG, () -> global)
             .build();
-    StellarFunctions.initialize(context);
-    executor.setContext(context);
   }
 
   @Override
@@ -104,7 +103,6 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
   }
 
   private void doExecute(Tuple input) throws ParseException, UnsupportedEncodingException {
-
     // retrieve the input message
     byte[] data = input.getBinary(0);
     JSONObject message = (JSONObject) parser.parse(new String(data, "UTF8"));
@@ -113,9 +111,10 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
     ProfilerConfig config = getProfilerConfig();
     if(config != null) {
 
-      // apply the message to each of the profile definitions
-      for (ProfileConfig profile: config.getProfiles()) {
-        applyProfile(profile, input, message);
+      // emit a message for each 'route'
+      List<MessageRoute> routes = router.route(message, config, getStellarContext());
+      for(MessageRoute route : routes) {
+        collector.emit(input, new Values(route.getEntity(), route.getProfileDefinition(), message));
       }
 
     } else {
@@ -124,27 +123,6 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
   }
 
   /**
-   * Applies a message to a Profile definition.
-   * @param profile The profile definition.
-   * @param input The input tuple that delivered the message.
-   * @param message The message that may be needed by the profile.
-   */
-  private void applyProfile(ProfileConfig profile, Tuple input, JSONObject message) throws ParseException, UnsupportedEncodingException {
-    @SuppressWarnings("unchecked")
-    Map<String, Object> state = (Map<String, Object>)message;
-
-    // is this message needed by this profile?
-    if (executor.execute(profile.getOnlyif(), state, Boolean.class)) {
-
-      // what is the name of the entity in this message?
-      String entity = executor.execute(profile.getForeach(), state, String.class);
-
-      // emit a message for the bolt responsible for building this profile
-      collector.emit(input, new Values(entity, profile, message));
-    }
-  }
-
-  /**
    * Each emitted tuple contains the following fields.
    * <p>
    * <ol>
@@ -159,11 +137,7 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
     declarer.declare(new Fields("entity", "profile", "message"));
   }
 
-  public StellarStatefulExecutor getExecutor() {
-    return executor;
-  }
-
-  public void setExecutor(StellarStatefulExecutor executor) {
-    this.executor = executor;
+  protected MessageRouter getMessageRouter() {
+    return router;
   }
 }


Mime
View raw message