metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject [3/3] metron git commit: METRON-739 Create Local Profile Runner (nickwallen) closes apache/metron#693
Date Tue, 15 Aug 2017 21:33:55 GMT
METRON-739 Create Local Profile Runner (nickwallen) closes apache/metron#693


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/073d6b50
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/073d6b50
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/073d6b50

Branch: refs/heads/master
Commit: 073d6b50def2ebd21e0f9c87b4d039e6dabf7616
Parents: 1f9a791
Author: nickwallen <nick@nickallen.org>
Authored: Tue Aug 15 17:33:16 2017 -0400
Committer: nickallen <nickallen@apache.org>
Committed: Tue Aug 15 17:33:16 2017 -0400

----------------------------------------------------------------------
 .../metron-profiler-client/README.md            |  67 +++
 .../profiler/client/stellar/FixedLookback.java  |   4 +-
 .../profiler/client/stellar/GetProfile.java     |  18 +-
 .../client/stellar/ProfilerClientConfig.java    | 104 ++++
 .../profiler/client/stellar/ProfilerConfig.java | 104 ----
 .../client/stellar/ProfilerFunctions.java       | 214 +++++++++
 .../metron/profiler/client/stellar/Util.java    |   2 +-
 .../profiler/client/stellar/WindowLookback.java |   4 +-
 .../metron/profiler/client/GetProfileTest.java  | 420 -----------------
 .../profiler/client/stellar/GetProfileTest.java | 419 ++++++++++++++++
 .../client/stellar/ProfilerFunctionsTest.java   | 209 ++++++++
 .../client/stellar/WindowLookbackTest.java      |   6 +-
 .../profiler/DefaultMessageDistributor.java     | 147 ++++++
 .../metron/profiler/DefaultMessageRouter.java   |  82 ++++
 .../metron/profiler/DefaultProfileBuilder.java  | 332 +++++++++++++
 .../metron/profiler/MessageDistributor.java     |  56 +++
 .../apache/metron/profiler/MessageRoute.java    |  65 +++
 .../apache/metron/profiler/MessageRouter.java   |  46 ++
 .../apache/metron/profiler/ProfileBuilder.java  | 301 +-----------
 .../metron/profiler/StandAloneProfiler.java     |  88 ++++
 .../profiler/DefaultMessageDistributorTest.java | 157 ++++++
 .../profiler/DefaultMessageRouterTest.java      | 183 +++++++
 .../profiler/DefaultProfileBuilderTest.java     | 472 +++++++++++++++++++
 .../metron/profiler/ProfileBuilderTest.java     | 460 ------------------
 .../profiler/bolt/ProfileBuilderBolt.java       |  97 ++--
 .../profiler/bolt/ProfileSplitterBolt.java      |  70 +--
 .../profiler/bolt/ProfileBuilderBoltTest.java   |  68 +--
 .../profiler/bolt/ProfileSplitterBoltTest.java  |   3 +-
 28 files changed, 2767 insertions(+), 1431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/README.md b/metron-analytics/metron-profiler-client/README.md
index dcf30f6..a8b5a55 100644
--- a/metron-analytics/metron-profiler-client/README.md
+++ b/metron-analytics/metron-profiler-client/README.md
@@ -389,3 +389,70 @@ Returns: The selected profile measurements.
 ```
 
 The client API call above has retrieved the past hour of the 'test' profile for the entity '192.168.138.158'.
+
+## Developing Profiles
+
+Troubleshooting issues when programming against a live stream of data can be difficult.  The Stellar REPL is a powerful tool to help work out the kinds of enrichments and transformations that are needed.  The Stellar REPL can also be used to help when developing profiles for the Profiler.
+
+Follow these steps in the Stellar REPL to see how it can be used to help create profiles.
+
+1.  Take a first pass at defining your profile.  As an example, in the editor copy/paste the basic "Hello, World" profile below.
+    ```
+    [Stellar]>>> conf := SHELL_EDIT()
+    [Stellar]>>> conf
+    {
+      "profiles": [
+        {
+          "profile": "hello-world",
+          "onlyif":  "exists(ip_src_addr)",
+          "foreach": "ip_src_addr",
+          "init":    { "count": "0" },
+          "update":  { "count": "count + 1" },
+          "result":  "count"
+        }
+      ]
+    }
+    ```
+
+1.  Initialize the Profiler.
+    ```
+    [Stellar]>>> profiler := PROFILER_INIT(conf)
+    [Stellar]>>> profiler
+    org.apache.metron.profiler.StandAloneProfiler@4f8ef473
+    ```
+
+1. Create a message to simulate the type of telemetry that you expect to be profiled.   As an example, in the editor copy/paste the JSON below.
+    ```
+    [Stellar]>>> message := SHELL_EDIT()
+    [Stellar]>>> message
+    {
+      "ip_src_addr": "10.0.0.1",
+      "protocol": "HTTPS",
+      "length": "10",
+      "bytes_in": "234"
+    }
+    ```
+
+1. Apply some telemetry messages to your profiles.  The following applies the same message 3 times.
+    ```
+    [Stellar]>>> PROFILER_APPLY(message, profiler)
+    org.apache.metron.profiler.StandAloneProfiler@4f8ef473
+
+    [Stellar]>>> PROFILER_APPLY(message, profiler)
+    org.apache.metron.profiler.StandAloneProfiler@4f8ef473
+
+    [Stellar]>>> PROFILER_APPLY(message, profiler)
+    org.apache.metron.profiler.StandAloneProfiler@4f8ef473
+    ```
+
+1. Flush the Profiler to see what has been calculated.  A flush is what occurs at the end of each 15 minute period in the Profiler.  The result is a list of profile measurements.  Each measurement is a map containing detailed information about the profile data that has been generated.
+    ```
+    [Stellar]>>> values := PROFILER_FLUSH(profiler)
+    [Stellar]>>> values
+    [{period={duration=900000, period=1669628, start=1502665200000, end=1502666100000}, 
+       profile=hello-world, groups=[], value=3, entity=10.0.0.1}]
+    ```
+    
+    This profile simply counts the number of messages by IP source address.  Notice that the value is '3' for the entity '10.0.0.1' as we applied 3 messages with an 'ip_src_addr' of '10.0.0.1'.  There will always be one measurement for each [profile, entity] pair.
+    
+1. If you are unhappy with the data that has been generated, then 'wash, rinse and repeat' this process.  After you are satisfied with the data being generated by the profile, then follow the [Getting Started](../metron-profiler#getting-started) guide to use the profile against your live, streaming data in a Metron cluster.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java
index 02b4a4c..f68f653 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java
@@ -55,8 +55,8 @@ public class FixedLookback implements StellarFunction {
       configOverridesMap = rawMap == null || rawMap.isEmpty() ? Optional.empty() : Optional.of(rawMap);
     }
     Map<String, Object> effectiveConfigs = Util.getEffectiveConfig(context, configOverridesMap.orElse(null));
-    Long tickDuration = ProfilerConfig.PROFILER_PERIOD.get(effectiveConfigs, Long.class);
-    TimeUnit tickUnit = TimeUnit.valueOf(ProfilerConfig.PROFILER_PERIOD_UNITS.get(effectiveConfigs, String.class));
+    Long tickDuration = ProfilerClientConfig.PROFILER_PERIOD.get(effectiveConfigs, Long.class);
+    TimeUnit tickUnit = TimeUnit.valueOf(ProfilerClientConfig.PROFILER_PERIOD_UNITS.get(effectiveConfigs, String.class));
     long end = System.currentTimeMillis();
     long start = end - units.toMillis(durationAgo);
     return ProfilePeriod.visitPeriods(start, end, tickDuration, tickUnit, Optional.empty(), period -> period);

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
index 03ac0a6..802c552 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
@@ -20,12 +20,12 @@
 
 package org.apache.metron.profiler.client.stellar;
 
-import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_COLUMN_FAMILY;
-import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_HBASE_TABLE_PROVIDER;
-import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD;
-import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD_UNITS;
-import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_SALT_DIVISOR;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
 import static org.apache.metron.profiler.client.stellar.Util.getArg;
 import static org.apache.metron.profiler.client.stellar.Util.getEffectiveConfig;
 
@@ -101,8 +101,6 @@ import org.slf4j.LoggerFactory;
 )
 public class GetProfile implements StellarFunction {
 
-
-
   /**
    * Cached client that can retrieve profile values.
    */
@@ -202,10 +200,6 @@ public class GetProfile implements StellarFunction {
     return groups;
   }
 
-
-
-
-
   /**
    * Creates the ColumnBuilder to use in accessing the profile data.
    * @param global The global configuration.

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
new file mode 100644
index 0000000..351b807
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
@@ -0,0 +1,104 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.hbase.HTableProvider;
+
+import java.util.Map;
+
+public enum ProfilerClientConfig {
+  /**
+   * A global property that defines the name of the HBase table used to store profile data.
+   */
+  PROFILER_HBASE_TABLE("profiler.client.hbase.table", "profiler", String.class),
+
+  /**
+   * A global property that defines the name of the column family used to store profile data.
+   */
+  PROFILER_COLUMN_FAMILY("profiler.client.hbase.column.family", "P", String.class),
+
+  /**
+   * A global property that defines the name of the HBaseTableProvider implementation class.
+   */
+  PROFILER_HBASE_TABLE_PROVIDER("hbase.provider.impl", HTableProvider.class.getName(), String.class),
+
+  /**
+   * A global property that defines the duration of each profile period.  This value
+   * should be defined along with 'profiler.client.period.duration.units'.
+   */
+  PROFILER_PERIOD("profiler.client.period.duration", 15L, Long.class),
+
+  /**
+   * A global property that defines the units of the profile period duration.  This value
+   * should be defined along with 'profiler.client.period.duration'.
+   */
+  PROFILER_PERIOD_UNITS("profiler.client.period.duration.units", "MINUTES", String.class),
+
+  /**
+   * A global property that defines the salt divisor used to store profile data.
+   */
+  PROFILER_SALT_DIVISOR("profiler.client.salt.divisor", 1000L, Long.class);
+
+  String key;
+  Object defaultValue;
+  Class<?> valueType;
+
+  ProfilerClientConfig(String key, Object defaultValue, Class<?> valueType) {
+    this.key = key;
+    this.defaultValue = defaultValue;
+    this.valueType = valueType;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public Object getDefault() {
+    return getDefault(valueType);
+  }
+
+  public <T> T getDefault(Class<T> clazz) {
+    return defaultValue == null?null:ConversionUtils.convert(defaultValue, clazz);
+  }
+
+  public Object get(Map<String, Object> profilerConfig) {
+    return getOrDefault(profilerConfig, defaultValue);
+  }
+
+  public Object getOrDefault(Map<String, Object> profilerConfig, Object defaultValue) {
+    return getOrDefault(profilerConfig, defaultValue, valueType);
+  }
+
+  public <T> T get(Map<String, Object> profilerConfig, Class<T> clazz) {
+    return getOrDefault(profilerConfig, defaultValue, clazz);
+  }
+
+  public <T> T getOrDefault(Map<String, Object> profilerConfig, Object defaultValue, Class<T> clazz) {
+    Object o = profilerConfig.getOrDefault(key, defaultValue);
+    return o == null?null:ConversionUtils.convert(o, clazz);
+  }
+
+  @Override
+  public String toString() {
+    return key;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerConfig.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerConfig.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerConfig.java
deleted file mode 100644
index e2ec275..0000000
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerConfig.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.client.stellar;
-
-import org.apache.metron.stellar.common.utils.ConversionUtils;
-import org.apache.metron.hbase.HTableProvider;
-
-import java.util.Map;
-
-public enum ProfilerConfig {
-  /**
-   * A global property that defines the name of the HBase table used to store profile data.
-   */
-  PROFILER_HBASE_TABLE("profiler.client.hbase.table", "profiler", String.class),
-
-  /**
-   * A global property that defines the name of the column family used to store profile data.
-   */
-  PROFILER_COLUMN_FAMILY("profiler.client.hbase.column.family", "P", String.class),
-
-  /**
-   * A global property that defines the name of the HBaseTableProvider implementation class.
-   */
-  PROFILER_HBASE_TABLE_PROVIDER("hbase.provider.impl", HTableProvider.class.getName(), String.class),
-
-  /**
-   * A global property that defines the duration of each profile period.  This value
-   * should be defined along with 'profiler.client.period.duration.units'.
-   */
-  PROFILER_PERIOD("profiler.client.period.duration", 15L, Long.class),
-
-  /**
-   * A global property that defines the units of the profile period duration.  This value
-   * should be defined along with 'profiler.client.period.duration'.
-   */
-  PROFILER_PERIOD_UNITS("profiler.client.period.duration.units", "MINUTES", String.class),
-
-  /**
-   * A global property that defines the salt divisor used to store profile data.
-   */
-  PROFILER_SALT_DIVISOR("profiler.client.salt.divisor", 1000L, Long.class);
-
-  String key;
-  Object defaultValue;
-  Class<?> valueType;
-  ProfilerConfig(String key, Object defaultValue, Class<?> valueType) {
-    this.key = key;
-    this.defaultValue = defaultValue;
-    this.valueType = valueType;
-  }
-
-  public String getKey() {
-    return key;
-  }
-
-  public Object getDefault() {
-    return getDefault(valueType);
-  }
-
-  public <T> T getDefault(Class<T> clazz) {
-    return defaultValue == null?null:ConversionUtils.convert(defaultValue, clazz);
-  }
-
-  public Object get(Map<String, Object> profilerConfig) {
-    return getOrDefault(profilerConfig, defaultValue);
-  }
-
-  public Object getOrDefault(Map<String, Object> profilerConfig, Object defaultValue) {
-    return getOrDefault(profilerConfig, defaultValue, valueType);
-  }
-
-  public <T> T get(Map<String, Object> profilerConfig, Class<T> clazz) {
-    return getOrDefault(profilerConfig, defaultValue, clazz);
-  }
-
-  public <T> T getOrDefault(Map<String, Object> profilerConfig, Object defaultValue, Class<T> clazz) {
-    Object o = profilerConfig.getOrDefault(key, defaultValue);
-    return o == null?null:ConversionUtils.convert(o, clazz);
-  }
-
-  @Override
-  public String toString() {
-    return key;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
new file mode 100644
index 0000000..827e1c4
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
@@ -0,0 +1,214 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.StandAloneProfiler;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
+import static org.apache.metron.profiler.client.stellar.Util.getArg;
+import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
+
+/**
+ * Stellar functions that allow interaction with the core Profiler components
+ * through the Stellar REPL.
+ */
+public class ProfilerFunctions {
+
+  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Stellar(
+          namespace="PROFILER",
+          name="INIT",
+          description="Creates a local profile runner that can execute profiles.",
+          params={
+                  "config", "The profiler configuration as a string."
+          },
+          returns="A local profile runner."
+  )
+  public static class ProfilerInit implements StellarFunction {
+
+    @Override
+    public void initialize(Context context) {
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+
+    @Override
+    public Object apply(List<Object> args, Context context) {
+      @SuppressWarnings("unchecked")
+      Map<String, Object> global = (Map<String, Object>) context.getCapability(GLOBAL_CONFIG, false)
+              .orElse(Collections.emptyMap());
+
+      // how long is the profile period?
+      long duration = PROFILER_PERIOD.getOrDefault(global, PROFILER_PERIOD.getDefault(), Long.class);
+      String configuredUnits = PROFILER_PERIOD_UNITS.getOrDefault(global, PROFILER_PERIOD_UNITS.getDefault(), String.class);
+      long periodDurationMillis = TimeUnit.valueOf(configuredUnits).toMillis(duration);
+
+      // user must provide the configuration for the profiler
+      String arg0 = getArg(0, String.class, args);
+      ProfilerConfig profilerConfig;
+      try {
+        profilerConfig = JSONUtils.INSTANCE.load(arg0, ProfilerConfig.class);
+
+      } catch(IOException e) {
+        throw new IllegalArgumentException("Invalid profiler configuration", e);
+      }
+
+      return new StandAloneProfiler(profilerConfig, periodDurationMillis, context);
+    }
+  }
+
+  @Stellar(
+          namespace="PROFILER",
+          name="APPLY",
+          description="Apply a message to a local profile runner.",
+          params={
+                  "message", "The message to apply.",
+                  "profiler", "A local profile runner returned by PROFILER_INIT."
+          },
+          returns="The local profile runner."
+  )
+  public static class ProfilerApply implements StellarFunction {
+
+    private JSONParser parser;
+
+    @Override
+    public void initialize(Context context) {
+      parser = new JSONParser();
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return parser != null;
+    }
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws ParseException {
+
+      // user must provide the json telemetry message
+      String arg0 = Util.getArg(0, String.class, args);
+      if(arg0 == null) {
+        throw new IllegalArgumentException(format("expected string, found null"));
+      }
+
+      // parse the message
+      JSONObject message;
+      try {
+        message = (JSONObject) parser.parse(arg0);
+
+      } catch(org.json.simple.parser.ParseException e) {
+        throw new IllegalArgumentException("invalid message", e);
+      }
+
+      // user must provide the stand alone profiler
+      StandAloneProfiler profiler = Util.getArg(1, StandAloneProfiler.class, args);
+      try {
+        profiler.apply(message);
+
+      } catch(ExecutionException e) {
+        throw new IllegalArgumentException(e);
+      }
+
+      return profiler;
+    }
+  }
+
+  @Stellar(
+          namespace="PROFILER",
+          name="FLUSH",
+          description="Flush a local profile runner.",
+          params={
+                  "profiler", "A local profile runner returned by PROFILER_INIT."
+          },
+          returns="A list of the profile values."
+  )
+  public static class ProfilerFlush implements StellarFunction {
+
+    @Override
+    public void initialize(Context context) {
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws ParseException {
+
+      // user must provide the stand-alone profiler
+      StandAloneProfiler profiler = Util.getArg(0, StandAloneProfiler.class, args);
+      if(profiler == null) {
+        throw new IllegalArgumentException(format("expected the profiler returned by PROFILER_INIT, found null"));
+      }
+
+      // transform the profile measurements into maps to simplify manipulation in stellar
+      List<Map<String, Object>> measurements = new ArrayList<>();
+      for(ProfileMeasurement m : profiler.flush()) {
+
+        // create a map for the profile period
+        Map<String, Object> period = new HashMap<>();
+        period.put("period", m.getPeriod().getPeriod());
+        period.put("start", m.getPeriod().getStartTimeMillis());
+        period.put("duration", m.getPeriod().getDurationMillis());
+        period.put("end", m.getPeriod().getEndTimeMillis());
+
+        // create a map for the measurement
+        Map<String, Object> measurement = new HashMap<>();
+        measurement.put("profile", m.getProfileName());
+        measurement.put("entity", m.getEntity());
+        measurement.put("value", m.getProfileValue());
+        measurement.put("groups", m.getGroups());
+        measurement.put("period", period);
+
+        measurements.add(measurement);
+      }
+
+      return measurements;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
index 279a60c..82c7fba 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
@@ -82,7 +82,7 @@ public class Util {
     Map<String, Object> result = new HashMap<>(6);
 
     // extract the relevant parameters from global, the overrides and the defaults
-    for (ProfilerConfig k : ProfilerConfig.values()) {
+    for (ProfilerClientConfig k : ProfilerClientConfig.values()) {
       Object globalValue = global.containsKey(k.key)?ConversionUtils.convert(global.get(k.key), k.valueType):null;
       Object overrideValue = configOverridesMap == null?null:k.getOrDefault(configOverridesMap, null);
       Object defaultValue = k.defaultValue;

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/WindowLookback.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/WindowLookback.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/WindowLookback.java
index 9e420e5..273b244 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/WindowLookback.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/WindowLookback.java
@@ -75,8 +75,8 @@ public class WindowLookback implements StellarFunction {
 
     }
     Map<String, Object> effectiveConfigs = Util.getEffectiveConfig(context, configOverridesMap.orElse(null));
-    Long tickDuration = ProfilerConfig.PROFILER_PERIOD.get(effectiveConfigs, Long.class);
-    TimeUnit tickUnit = TimeUnit.valueOf(ProfilerConfig.PROFILER_PERIOD_UNITS.get(effectiveConfigs, String.class));
+    Long tickDuration = ProfilerClientConfig.PROFILER_PERIOD.get(effectiveConfigs, Long.class);
+    TimeUnit tickUnit = TimeUnit.valueOf(ProfilerClientConfig.PROFILER_PERIOD_UNITS.get(effectiveConfigs, String.class));
     Window w = null;
     try {
       w = windowCache.get(windowSelector, () -> WindowProcessor.process(windowSelector));

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
deleted file mode 100644
index 00d842c..0000000
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
+++ /dev/null
@@ -1,420 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.client;
-
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
-import org.apache.metron.stellar.dsl.functions.resolver.SingletonFunctionResolver;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.client.stellar.FixedLookback;
-import org.apache.metron.profiler.client.stellar.GetProfile;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
-import org.apache.metron.stellar.common.StellarStatefulExecutor;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.metron.profiler.client.stellar.ProfilerConfig.*;
-
-/**
- * Tests the GetProfile class.
- */
-public class GetProfileTest {
-
-  private static final long periodDuration = 15;
-  private static final TimeUnit periodUnits = TimeUnit.MINUTES;
-  private static final int saltDivisor = 1000;
-  private static final String tableName = "profiler";
-  private static final String columnFamily = "P";
-  private StellarStatefulExecutor executor;
-  private Map<String, Object> state;
-  private ProfileWriter profileWriter;
-  // different values of period and salt divisor, used to test config_overrides feature
-  private static final long periodDuration2 = 1;
-  private static final TimeUnit periodUnits2 = TimeUnit.HOURS;
-  private static final int saltDivisor2 = 2050;
-
-
-
-  private <T> T run(String expression, Class<T> clazz) {
-    return executor.execute(expression, state, clazz);
-  }
-
-  /**
-   * This method sets up the configuration context for both writing profile data
-   * (using profileWriter to mock the complex process of what the Profiler topology
-   * actually does), and then reading that profile data (thereby testing the PROFILE_GET
-   * Stellar client implemented in GetProfile).
-   *
-   * It runs at @Before time, and sets testclass global variables used by the writers and readers.
-   * The various writers and readers are in each test case, not here.
-   *
-   * @return void
-   */
-  @Before
-  public void setup() {
-    state = new HashMap<>();
-    final HTableInterface table = MockHBaseTableProvider.addToCache(tableName, columnFamily);
-
-    // used to write values to be read during testing
-    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
-    ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
-    profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
-
-    // global properties
-    Map<String, Object> global = new HashMap<String, Object>() {{
-      put(PROFILER_HBASE_TABLE.getKey(), tableName);
-      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
-      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
-      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
-      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
-      put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor));
-    }};
-
-    // create the stellar execution environment
-    executor = new DefaultStellarStatefulExecutor(
-            new SimpleFunctionResolver()
-                    .withClass(GetProfile.class)
-                    .withClass(FixedLookback.class),
-            new Context.Builder()
-                    .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
-                    .build());
-  }
-
-  /**
-   * This method is similar to setup(), in that it sets up profiler configuration context,
-   * but only for the client.  Additionally, it uses periodDuration2, periodUnits2
-   * and saltDivisor2, instead of periodDuration, periodUnits and saltDivisor respectively.
-   *
-   * This is used in the unit tests that test the config_overrides feature of PROFILE_GET.
-   * In these tests, the context from @Before setup() is used to write the data, then the global
-   * context is changed to context2 (from this method).  Each test validates that a default read
-   * using global context2 then gets no valid results (as expected), and that a read using
-   * original context values in the PROFILE_GET config_overrides argument gets all expected results.
-   *
-   * @return context2 - The profiler client configuration context created by this method.
-   *    The context2 values are also set in the configuration of the StellarStatefulExecutor
-   *    stored in the global variable 'executor'.  However, there is no API for querying the
-   *    context values from a StellarStatefulExecutor, so we output the context2 Context object itself,
-   *    for validation purposes (so that its values can be validated as being significantly
-   *    different from the setup() settings).
-   */
-  private Context setup2() {
-    state = new HashMap<>();
-
-    // global properties
-    Map<String, Object> global = new HashMap<String, Object>() {{
-      put(PROFILER_HBASE_TABLE.getKey(), tableName);
-      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
-      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
-      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration2));
-      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits2.toString());
-      put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor2));
-    }};
-
-    // create the modified context
-    Context context2 = new Context.Builder()
-            .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
-            .build();
-
-    // create the stellar execution environment
-    executor = new DefaultStellarStatefulExecutor(
-            new SimpleFunctionResolver()
-                    .withClass(GetProfile.class)
-                    .withClass(FixedLookback.class),
-            context2);
-
-    return context2; //because there is no executor.getContext() method
-  }
-
-  /**
-   * Values should be retrievable that have NOT been stored within a group.
-   */
-  @Test
-  public void testWithNoGroups() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Collections.emptyList();
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // execute - read the profile values - no groups
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-  }
-
-  /**
-   * Values should be retrievable that have been stored within a 'group'.
-   */
-  @Test
-  public void testWithOneGroup() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Arrays.asList("weekends");
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // execute - read the profile values
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekends'])";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-
-    // test the deprecated but allowed "varargs" form of groups specification
-    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 'weekends')";
-    result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-  }
-
-  /**
-   * Values should be retrievable that have been stored within a 'group'.
-   */
-  @Test
-  public void testWithTwoGroups() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Arrays.asList("weekdays", "tuesday");
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // execute - read the profile values
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekdays', 'tuesday'])";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-
-    // test the deprecated but allowed "varargs" form of groups specification
-    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 'weekdays', 'tuesday')";
-    result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-  }
-
-  /**
-   * Initialization should fail if the required context values are missing.
-   */
-  @Test(expected = IllegalStateException.class)
-  public void testMissingContext() {
-    Context empty = Context.EMPTY_CONTEXT();
-
-    // 'unset' the context that was created during setup()
-    executor.setContext(empty);
-
-    // force re-initialization with no context
-    SingletonFunctionResolver.getInstance().initialize(empty);
-
-    // validate - function should be unable to initialize
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(1000, 'SECONDS'), groups)";
-    run(expr, List.class);
-  }
-
-  /**
-   * If the time horizon specified does not include any profile measurements, then
-   * none should be returned.
-   */
-  @Test
-  public void testOutsideTimeHorizon() {
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Collections.emptyList();
-
-    // setup - write a single value from 2 hours ago
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, 1, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // execute - read the profile values
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'SECONDS'))";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - there should be no values from only 4 seconds ago
-    Assert.assertEquals(0, result.size());
-  }
-
-  /**
-   * Values should be retrievable that were written with configuration different than current global config.
-   */
-  @Test
-  public void testWithConfigOverride() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Collections.emptyList();
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // now change the executor configuration
-    Context context2 = setup2();
-    // validate it is changed in significant way
-    @SuppressWarnings("unchecked")
-    Map<String, Object> global = (Map<String, Object>) context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
-    Assert.assertEquals(PROFILER_PERIOD.get(global), periodDuration2);
-    Assert.assertNotEquals(periodDuration, periodDuration2);
-
-    // execute - read the profile values - with (wrong) default global config values.
-    // No error message at this time, but returns empty results list, because
-    // row keys are not correctly calculated.
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to fail to read any values
-    Assert.assertEquals(0, result.size());
-
-    // execute - read the profile values - with config_override.
-    // first two override values are strings, third is deliberately a number.
-    String overrides = "{'profiler.client.period.duration' : '" + periodDuration + "', "
-            + "'profiler.client.period.duration.units' : '" + periodUnits.toString() + "', "
-            + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
-    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS', " + overrides + "), [], " + overrides + ")"
-            ;
-    result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-  }
-
-  /**
-   * Values should be retrievable that have been stored within a 'group', with
-   * configuration different than current global config.
-   * This time put the config_override case before the non-override case.
-   */
-  @Test
-  public void testWithConfigAndOneGroup() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Arrays.asList("weekends");
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // now change the executor configuration
-    Context context2 = setup2();
-    // validate it is changed in significant way
-    @SuppressWarnings("unchecked")
-    Map<String, Object> global = (Map<String, Object>) context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
-    Assert.assertEquals(global.get(PROFILER_PERIOD.getKey()), Long.toString(periodDuration2));
-    Assert.assertNotEquals(periodDuration, periodDuration2);
-
-    // execute - read the profile values - with config_override.
-    // first two override values are strings, third is deliberately a number.
-    String overrides = "{'profiler.client.period.duration' : '" + periodDuration + "', "
-            + "'profiler.client.period.duration.units' : '" + periodUnits.toString() + "', "
-            + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
-    String expr = "PROFILE_GET('profile1', 'entity1'" +
-            ", PROFILE_FIXED(4, 'HOURS', " + overrides + "), ['weekends'], " +
-            overrides + ")";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-
-    // execute - read the profile values - with (wrong) default global config values.
-    // No error message at this time, but returns empty results list, because
-    // row keys are not correctly calculated.
-    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekends'])";
-    result = run(expr, List.class);
-
-    // validate - expect to fail to read any values
-    Assert.assertEquals(0, result.size());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
new file mode 100644
index 0000000..307b548
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
@@ -0,0 +1,419 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.profiler.client.ProfileWriter;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
+import org.apache.metron.stellar.dsl.functions.resolver.SingletonFunctionResolver;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.client.stellar.FixedLookback;
+import org.apache.metron.profiler.client.stellar.GetProfile;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.*;
+
+/**
+ * Tests the GetProfile class.
+ */
+public class GetProfileTest {
+
+  private static final long periodDuration = 15;
+  private static final TimeUnit periodUnits = TimeUnit.MINUTES;
+  private static final int saltDivisor = 1000;
+  private static final String tableName = "profiler";
+  private static final String columnFamily = "P";
+  private StellarStatefulExecutor executor;
+  private Map<String, Object> state;
+  private ProfileWriter profileWriter;
+  // different values of period and salt divisor, used to test config_overrides feature
+  private static final long periodDuration2 = 1;
+  private static final TimeUnit periodUnits2 = TimeUnit.HOURS;
+  private static final int saltDivisor2 = 2050;
+
+  private <T> T run(String expression, Class<T> clazz) {
+    return executor.execute(expression, state, clazz);
+  }
+
+  /**
+   * This method sets up the configuration context for both writing profile data
+   * (using profileWriter to mock the complex process of what the Profiler topology
+   * actually does), and then reading that profile data (thereby testing the PROFILE_GET
+   * Stellar client implemented in GetProfile).
+   *
+   * It runs at @Before time, and sets testclass global variables used by the writers and readers.
+   * The various writers and readers are in each test case, not here.
+   *
+   * @return void
+   */
+  @Before
+  public void setup() {
+    state = new HashMap<>();
+    final HTableInterface table = MockHBaseTableProvider.addToCache(tableName, columnFamily);
+
+    // used to write values to be read during testing
+    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
+    ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+    profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
+
+    // global properties
+    Map<String, Object> global = new HashMap<String, Object>() {{
+      put(PROFILER_HBASE_TABLE.getKey(), tableName);
+      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
+      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
+      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
+      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
+      put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor));
+    }};
+
+    // create the stellar execution environment
+    executor = new DefaultStellarStatefulExecutor(
+            new SimpleFunctionResolver()
+                    .withClass(GetProfile.class)
+                    .withClass(FixedLookback.class),
+            new Context.Builder()
+                    .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+                    .build());
+  }
+
+  /**
+   * This method is similar to setup(), in that it sets up profiler configuration context,
+   * but only for the client.  Additionally, it uses periodDuration2, periodUnits2
+   * and saltDivisor2, instead of periodDuration, periodUnits and saltDivisor respectively.
+   *
+   * This is used in the unit tests that test the config_overrides feature of PROFILE_GET.
+   * In these tests, the context from @Before setup() is used to write the data, then the global
+   * context is changed to context2 (from this method).  Each test validates that a default read
+   * using global context2 then gets no valid results (as expected), and that a read using
+   * original context values in the PROFILE_GET config_overrides argument gets all expected results.
+   *
+   * @return context2 - The profiler client configuration context created by this method.
+   *    The context2 values are also set in the configuration of the StellarStatefulExecutor
+   *    stored in the global variable 'executor'.  However, there is no API for querying the
+   *    context values from a StellarStatefulExecutor, so we output the context2 Context object itself,
+   *    for validation purposes (so that its values can be validated as being significantly
+   *    different from the setup() settings).
+   */
+  private Context setup2() {
+    state = new HashMap<>();
+
+    // global properties
+    Map<String, Object> global = new HashMap<String, Object>() {{
+      put(PROFILER_HBASE_TABLE.getKey(), tableName);
+      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
+      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
+      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration2));
+      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits2.toString());
+      put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor2));
+    }};
+
+    // create the modified context
+    Context context2 = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+            .build();
+
+    // create the stellar execution environment
+    executor = new DefaultStellarStatefulExecutor(
+            new SimpleFunctionResolver()
+                    .withClass(GetProfile.class)
+                    .withClass(FixedLookback.class),
+            context2);
+
+    return context2; //because there is no executor.getContext() method
+  }
+
+  /**
+   * Values should be retrievable that have NOT been stored within a group.
+   */
+  @Test
+  public void testWithNoGroups() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Collections.emptyList();
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // execute - read the profile values - no groups
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+  }
+
+  /**
+   * Values should be retrievable that have been stored within a 'group'.
+   */
+  @Test
+  public void testWithOneGroup() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Arrays.asList("weekends");
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // create a variable that contains the groups to use
+    state.put("groups", group);
+
+    // execute - read the profile values
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekends'])";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+
+    // test the deprecated but allowed "varargs" form of groups specification
+    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 'weekends')";
+    result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+  }
+
+  /**
+   * Values should be retrievable that have been stored within a 'group'.
+   */
+  @Test
+  public void testWithTwoGroups() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Arrays.asList("weekdays", "tuesday");
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // create a variable that contains the groups to use
+    state.put("groups", group);
+
+    // execute - read the profile values
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekdays', 'tuesday'])";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+
+    // test the deprecated but allowed "varargs" form of groups specification
+    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 'weekdays', 'tuesday')";
+    result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+  }
+
+  /**
+   * Initialization should fail if the required context values are missing.
+   */
+  @Test(expected = IllegalStateException.class)
+  public void testMissingContext() {
+    Context empty = Context.EMPTY_CONTEXT();
+
+    // 'unset' the context that was created during setup()
+    executor.setContext(empty);
+
+    // force re-initialization with no context
+    SingletonFunctionResolver.getInstance().initialize(empty);
+
+    // validate - function should be unable to initialize
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(1000, 'SECONDS'), groups)";
+    run(expr, List.class);
+  }
+
+  /**
+   * If the time horizon specified does not include any profile measurements, then
+   * none should be returned.
+   */
+  @Test
+  public void testOutsideTimeHorizon() {
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Collections.emptyList();
+
+    // setup - write a single value from 2 hours ago
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+    profileWriter.write(m, 1, group, val -> expectedValue);
+
+    // create a variable that contains the groups to use
+    state.put("groups", group);
+
+    // execute - read the profile values
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'SECONDS'))";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - there should be no values from only 4 seconds ago
+    Assert.assertEquals(0, result.size());
+  }
+
+  /**
+   * Values should be retrievable that were written with configuration different than current global config.
+   */
+  @Test
+  public void testWithConfigOverride() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Collections.emptyList();
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // now change the executor configuration
+    Context context2 = setup2();
+    // validate it is changed in significant way
+    @SuppressWarnings("unchecked")
+    Map<String, Object> global = (Map<String, Object>) context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
+    Assert.assertEquals(PROFILER_PERIOD.get(global), periodDuration2);
+    Assert.assertNotEquals(periodDuration, periodDuration2);
+
+    // execute - read the profile values - with (wrong) default global config values.
+    // No error message at this time, but returns empty results list, because
+    // row keys are not correctly calculated.
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to fail to read any values
+    Assert.assertEquals(0, result.size());
+
+    // execute - read the profile values - with config_override.
+    // first two override values are strings, third is deliberately a number.
+    String overrides = "{'profiler.client.period.duration' : '" + periodDuration + "', "
+            + "'profiler.client.period.duration.units' : '" + periodUnits.toString() + "', "
+            + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
+    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS', " + overrides + "), [], " + overrides + ")"
+            ;
+    result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+  }
+
+  /**
+   * Values should be retrievable that have been stored within a 'group', with
+   * configuration different than current global config.
+   * This time put the config_override case before the non-override case.
+   */
+  @Test
+  public void testWithConfigAndOneGroup() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Arrays.asList("weekends");
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // create a variable that contains the groups to use
+    state.put("groups", group);
+
+    // now change the executor configuration
+    Context context2 = setup2();
+    // validate it is changed in significant way
+    @SuppressWarnings("unchecked")
+    Map<String, Object> global = (Map<String, Object>) context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
+    Assert.assertEquals(global.get(PROFILER_PERIOD.getKey()), Long.toString(periodDuration2));
+    Assert.assertNotEquals(periodDuration, periodDuration2);
+
+    // execute - read the profile values - with config_override.
+    // first two override values are strings, third is deliberately a number.
+    String overrides = "{'profiler.client.period.duration' : '" + periodDuration + "', "
+            + "'profiler.client.period.duration.units' : '" + periodUnits.toString() + "', "
+            + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
+    String expr = "PROFILE_GET('profile1', 'entity1'" +
+            ", PROFILE_FIXED(4, 'HOURS', " + overrides + "), ['weekends'], " +
+            overrides + ")";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+
+    // execute - read the profile values - with (wrong) default global config values.
+    // No error message at this time, but returns empty results list, because
+    // row keys are not correctly calculated.
+    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekends'])";
+    result = run(expr, List.class);
+
+    // validate - expect to fail to read any values
+    Assert.assertEquals(0, result.size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/ProfilerFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/ProfilerFunctionsTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/ProfilerFunctionsTest.java
new file mode 100644
index 0000000..9cc8046
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/ProfilerFunctionsTest.java
@@ -0,0 +1,209 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.StandAloneProfiler;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.common.shell.StellarExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests the ProfilerFunctions class.
+ */
+public class ProfilerFunctionsTest {
+
+  /**
+   * {
+   *    "ip_src_addr": "10.0.0.1",
+   *    "ip_dst_addr": "10.0.0.2",
+   *    "source.type": "test",
+   * }
+   */
+  @Multiline
+  private String message;
+
+  /**
+   * {
+   *   "profiles": [
+   *        {
+   *          "profile":  "hello-world",
+   *          "foreach":  "ip_src_addr",
+   *          "init":     { "count": 0 },
+   *          "update":   { "count": "count + 1" },
+   *          "result":   "count"
+   *        }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String helloWorldProfilerDef;
+
+  private static final long periodDuration = 15;
+  private static final String periodUnits = "MINUTES";
+  private StellarStatefulExecutor executor;
+  private Map<String, Object> state;
+
+  private <T> T run(String expression, Class<T> clazz) {
+    return executor.execute(expression, state, clazz);
+  }
+
+  @Before
+  public void setup() {
+    state = new HashMap<>();
+
+    // global properties
+    Map<String, Object> global = new HashMap<String, Object>() {{
+      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
+      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
+    }};
+
+    // create the stellar execution environment
+    executor = new DefaultStellarStatefulExecutor(
+            new SimpleFunctionResolver()
+                    .withClass(ProfilerFunctions.ProfilerInit.class)
+                    .withClass(ProfilerFunctions.ProfilerApply.class)
+                    .withClass(ProfilerFunctions.ProfilerFlush.class),
+            new Context.Builder()
+                    .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+                    .build());
+  }
+
+  @Test
+  public void testProfilerInitNoProfiles() {
+    state.put("config", "{ \"profiles\" : [] }");
+    StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
+    assertNotNull(profiler);
+    assertEquals(0, profiler.getConfig().getProfiles().size());
+  }
+
+  @Test
+  public void testProfilerInitWithProfiles() {
+    state.put("config", helloWorldProfilerDef);
+    StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
+    assertNotNull(profiler);
+    assertEquals(1, profiler.getConfig().getProfiles().size());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProfilerInitNoArgs() {
+    run("PROFILER_INIT()", StandAloneProfiler.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProfilerInitInvalidArg() {
+    run("PROFILER_INIT({ \"invalid\": 2 })", StandAloneProfiler.class);
+  }
+
+  @Test
+  public void testProfilerInitWithNoGlobalConfig() {
+    state.put("config", helloWorldProfilerDef);
+    String expression = "PROFILER_INIT(config)";
+
+    // use an executor with no GLOBAL_CONFIG defined in the context
+    StellarStatefulExecutor executor = new DefaultStellarStatefulExecutor(
+            new SimpleFunctionResolver()
+                    .withClass(ProfilerFunctions.ProfilerInit.class)
+                    .withClass(ProfilerFunctions.ProfilerApply.class)
+                    .withClass(ProfilerFunctions.ProfilerFlush.class),
+            Context.EMPTY_CONTEXT());
+    StandAloneProfiler profiler = executor.execute(expression, state, StandAloneProfiler.class);
+
+    assertNotNull(profiler);
+    assertEquals(1, profiler.getConfig().getProfiles().size());
+  }
+
+  @Test
+  public void testProfilerApply() {
+
+    // initialize the profiler
+    state.put("config", helloWorldProfilerDef);
+    StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
+    state.put("profiler", profiler);
+
+    // apply a message to the profiler
+    state.put("message", message);
+    StandAloneProfiler result = run("PROFILER_APPLY(message, profiler)", StandAloneProfiler.class);
+    assertSame(profiler, result);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProfilerApplyNoArgs() {
+    run("PROFILER_APPLY()", StandAloneProfiler.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProfilerApplyInvalidArg() {
+    run("PROFILER_APPLY(undefined)", StandAloneProfiler.class);
+  }
+
+  @Test
+  public void testProfilerFlush() {
+
+    // initialize the profiler
+    state.put("config", helloWorldProfilerDef);
+    StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
+    state.put("profiler", profiler);
+
+    // apply a message to the profiler
+    state.put("message", message);
+    run("PROFILER_APPLY(message, profiler)", StandAloneProfiler.class);
+
+    // flush the profiles
+    List<Map<String, Object>> measurements = run("PROFILER_FLUSH(profiler)", List.class);
+
+    // validate
+    assertNotNull(measurements);
+    assertEquals(1, measurements.size());
+
+    Map<String, Object> measurement = measurements.get(0);
+    assertEquals("hello-world", measurement.get("profile"));
+    assertEquals("10.0.0.1", measurement.get("entity"));
+    assertEquals(1, measurement.get("value"));
+    assertEquals(Collections.emptyList(), measurement.get("groups"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProfilerFlushNoArgs() {
+    run("PROFILER_FLUSH()", StandAloneProfiler.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProfilerFlushInvalidArg() {
+    run("PROFILER_FLUSH(undefined)", StandAloneProfiler.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/WindowLookbackTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/WindowLookbackTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/WindowLookbackTest.java
index fd6d122..9ef1805 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/WindowLookbackTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/WindowLookbackTest.java
@@ -60,7 +60,7 @@ public class WindowLookbackTest {
     long durationMs = 60000;
     State state = test("1 hour", new Date()
                       , Optional.of(
-                              ImmutableMap.of( ProfilerConfig.PROFILER_PERIOD.getKey(), 1 )
+                              ImmutableMap.of( ProfilerClientConfig.PROFILER_PERIOD.getKey(), 1 )
                                    )
                       ,Assertions.NOT_EMPTY,Assertions.CONTIGUOUS);
     Assert.assertEquals(TimeUnit.HOURS.toMillis(1) / durationMs, state.periods.size());
@@ -114,8 +114,8 @@ public class WindowLookbackTest {
   }
 
   long getDurationMs() {
-    int duration = ProfilerConfig.PROFILER_PERIOD.getDefault(Integer.class);
-    TimeUnit unit = TimeUnit.valueOf(ProfilerConfig.PROFILER_PERIOD_UNITS.getDefault(String.class));
+    int duration = ProfilerClientConfig.PROFILER_PERIOD.getDefault(Integer.class);
+    TimeUnit unit = TimeUnit.valueOf(ProfilerClientConfig.PROFILER_PERIOD_UNITS.getDefault(String.class));
     return unit.toMillis(duration);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
new file mode 100644
index 0000000..ba3c0d8
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -0,0 +1,147 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.profiler.clock.WallClock;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+
+/**
+ * Distributes a message along a MessageRoute.  A MessageRoute will lead to one or
+ * more ProfileBuilders.
+ *
+ * A ProfileBuilder is responsible for maintaining the state of a single profile,
+ * for a single entity.  There will be one ProfileBuilder for each (profile, entity) pair.
+ * This class ensures that each ProfileBuilder receives the telemetry messages that
+ * it needs.
+ */
+public class DefaultMessageDistributor implements MessageDistributor {
+
+  /**
+   * The duration of each profile period in milliseconds.
+   */
+  private long periodDurationMillis;
+
+  /**
+   * Maintains the state of a profile which is unique to a profile/entity pair.
+   */
+  private transient Cache<String, ProfileBuilder> profileCache;
+
+  /**
+   * Create a new message distributor.
+   * @param periodDurationMillis The period duration in milliseconds.
+   * @param profileTimeToLiveMillis The TTL of a profile in milliseconds.
+   */
+  public DefaultMessageDistributor(long periodDurationMillis, long profileTimeToLiveMillis) {
+    if(profileTimeToLiveMillis < periodDurationMillis) {
+      throw new IllegalStateException(format(
+              "invalid configuration: expect profile TTL (%d) to be greater than period duration (%d)",
+              profileTimeToLiveMillis,
+              periodDurationMillis));
+    }
+    this.periodDurationMillis = periodDurationMillis;
+    this.profileCache = CacheBuilder
+            .newBuilder()
+            .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
+            .build();
+  }
+
+  /**
+   * Distribute a message along a MessageRoute.
+   *
+   * @param message The message that needs distributed.
+   * @param route The message route.
+   * @param context The Stellar execution context.
+   * @throws ExecutionException
+   */
+  @Override
+  public void distribute(JSONObject message, MessageRoute route, Context context) throws ExecutionException {
+    getBuilder(route, context).apply(message);
+  }
+
+  /**
+   * Flushes all profiles.  Flushes all ProfileBuilders that this distributor is responsible for.
+   *
+   * @return The profile measurements; one for each (profile, entity) pair.
+   */
+  @Override
+  public List<ProfileMeasurement> flush() {
+    List<ProfileMeasurement> measurements = new ArrayList<>();
+
+    profileCache.asMap().forEach((key, profileBuilder) -> {
+      if(profileBuilder.isInitialized()) {
+        ProfileMeasurement measurement = profileBuilder.flush();
+        measurements.add(measurement);
+      }
+    });
+
+    profileCache.cleanUp();
+    return measurements;
+  }
+
+  /**
+   * Retrieves the cached ProfileBuilder that is used to build and maintain the Profile.  If none exists,
+   * one will be created and returned.
+   * @param route The message route.
+   * @param context The Stellar execution context.
+   */
+  public ProfileBuilder getBuilder(MessageRoute route, Context context) throws ExecutionException {
+    ProfileConfig profile = route.getProfileDefinition();
+    String entity = route.getEntity();
+    return profileCache.get(
+            cacheKey(profile, entity),
+            () -> new DefaultProfileBuilder.Builder()
+                    .withDefinition(profile)
+                    .withEntity(entity)
+                    .withPeriodDurationMillis(periodDurationMillis)
+                    .withContext(context)
+                    .withClock(new WallClock())
+                    .build());
+  }
+
+  /**
+   * Builds the key that is used to lookup the ProfileState within the cache.
+   * @param profile The profile definition.
+   * @param entity The entity.
+   */
+  private String cacheKey(ProfileConfig profile, String entity) {
+    return format("%s:%s", profile, entity);
+  }
+
+  public DefaultMessageDistributor withPeriodDurationMillis(long periodDurationMillis) {
+    this.periodDurationMillis = periodDurationMillis;
+    return this;
+  }
+
+  public DefaultMessageDistributor withPeriodDuration(int duration, TimeUnit units) {
+    return withPeriodDurationMillis(units.toMillis(duration));
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
new file mode 100644
index 0000000..5a32e69
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
@@ -0,0 +1,82 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.json.simple.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Routes incoming telemetry messages.
+ *
+ * A single telemetry message may need to take multiple routes.  This is the case
+ * when a message is needed by more than one profile.
+ */
+public class DefaultMessageRouter implements MessageRouter {
+
+  /**
+   * Executes Stellar code.
+   */
+  private StellarStatefulExecutor executor;
+
+  public DefaultMessageRouter(Context context) {
+    this.executor = new DefaultStellarStatefulExecutor();
+    StellarFunctions.initialize(context);
+    executor.setContext(context);
+  }
+
+  /**
+   * Route a telemetry message.  Finds all routes for a given telemetry message.
+   *
+   * @param message The telemetry message that needs routed.
+   * @param config The configuration for the Profiler.
+   * @param context The Stellar execution context.
+   * @return A list of all the routes for the message.
+   */
+  @Override
+  public List<MessageRoute> route(JSONObject message, ProfilerConfig config, Context context) {
+    List<MessageRoute> routes = new ArrayList<>();
+    @SuppressWarnings("unchecked")
+    final Map<String, Object> state = (Map<String, Object>) message;
+
+    // attempt to route the message to each of the profiles
+    for (ProfileConfig profile: config.getProfiles()) {
+
+      // is this message needed by this profile?
+      if (executor.execute(profile.getOnlyif(), state, Boolean.class)) {
+
+        // what is the name of the entity in this message?
+        String entity = executor.execute(profile.getForeach(), state, String.class);
+        routes.add(new MessageRoute(profile, entity));
+      }
+    }
+
+    return routes;
+  }
+}


Mime
View raw message