metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject incubator-metron git commit: METRON-391 Create Stellar Function to Read Profile Data for Model Scoring (nickwallen via cestella) closes apache/incubator-metron#242
Date Tue, 13 Sep 2016 18:36:33 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master eaf625a79 -> ef3e9fa5e


METRON-391 Create Stellar Function to Read Profile Data for Model Scoring (nickwallen via cestella) closes apache/incubator-metron#242


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/ef3e9fa5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/ef3e9fa5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/ef3e9fa5

Branch: refs/heads/master
Commit: ef3e9fa5ea506f3ba624ca2102ea9028782dd974
Parents: eaf625a
Author: nickwallen <nick@nickallen.org>
Authored: Tue Sep 13 14:36:20 2016 -0400
Committer: cstella <cestella@gmail.com>
Committed: Tue Sep 13 14:36:20 2016 -0400

----------------------------------------------------------------------
 .../profiler/client/HBaseProfilerClient.java    |  46 ++--
 .../profiler/client/stellar/GetProfile.java     | 269 +++++++++++++++++++
 .../metron/profiler/client/GetProfileTest.java  | 234 ++++++++++++++++
 .../client/HBaseProfilerClientTest.java         |  93 ++-----
 .../metron/profiler/client/ProfileWriter.java   |  97 +++++++
 .../metron/profiler/ProfileMeasurement.java     |  14 +-
 .../profiler/bolt/ProfileBuilderBolt.java       |   1 +
 .../profiler/bolt/ProfileSplitterBolt.java      |   5 +-
 .../stellar/DefaultStellarExecutor.java         |  63 +++--
 .../profiler/stellar/StellarExecutor.java       |  30 ++-
 .../stellar/DefaultStellarExecutorTest.java     |  19 ++
 .../org/apache/metron/common/dsl/Context.java   |   3 +-
 .../metron/common/dsl/FunctionResolver.java     |  23 ++
 .../common/dsl/FunctionResolverSingleton.java   |  30 ++-
 .../metron/common/dsl/StellarFunctionInfo.java  |  33 ++-
 .../src/main/config/zookeeper/global.json       |  12 +-
 16 files changed, 832 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
index b4e52b5..97691d4 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
@@ -64,43 +64,57 @@ public class HBaseProfilerClient implements ProfilerClient {
   /**
    * Fetches all of the data values associated with a Profile.
    *
-   * @param profile The name of the profile.
-   * @param entity The name of the entity.
+   * @param profile     The name of the profile.
+   * @param entity      The name of the entity.
    * @param durationAgo How far in the past to fetch values from.
-   * @param unit The time unit of 'durationAgo'.
-   * @param groups The groups
-   * @param <T> The type of values stored by the Profile.
+   * @param unit        The time unit of 'durationAgo'.
+   * @param groups      The groups
+   * @param <T>         The type of values stored by the Profile.
    * @return A list of profile values.
    */
   @Override
   public <T> List<T> fetch(String profile, String entity, long durationAgo, TimeUnit unit, Class<T> clazz, List<Object> groups) {
+    byte[] columnFamily = Bytes.toBytes(columnBuilder.getColumnFamily());
+    byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
 
     // find all the row keys that satisfy this fetch
     List<byte[]> keysToFetch = rowKeyBuilder.rowKeys(profile, entity, groups, durationAgo, unit);
-    byte[] columnFamilyBytes = Bytes.toBytes(columnBuilder.getColumnFamily());
-    byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
 
     // create a Get for each of the row keys
     List<Get> gets = keysToFetch
             .stream()
-            .map(k -> new Get(k).addColumn(columnFamilyBytes, columnQualifier))
+            .map(k -> new Get(k).addColumn(columnFamily, columnQualifier))
             .collect(Collectors.toList());
 
-    // submit the gets to HBase
-    try {
-      List<T> values = new ArrayList<>();
+    // get the 'gets'
+    return get(gets, columnQualifier, columnFamily, clazz);
+  }
 
+  /**
+   * Submits multiple Gets to HBase and deserialize the results.
+   *
+   * @param gets            The gets to submit to HBase.
+   * @param columnQualifier The column qualifier.
+   * @param columnFamily    The column family.
+   * @param clazz           The type expected in return.
+   * @param <T>             The type expected in return.
+   * @return
+   */
+  private <T> List<T> get(List<Get> gets, byte[] columnQualifier, byte[] columnFamily, Class<T> clazz) {
+    List<T> values = new ArrayList<>();
+
+    try {
       Result[] results = table.get(gets);
       Arrays.stream(results)
-              .filter(r -> r.containsColumn(columnFamilyBytes, columnQualifier))
-              .map(r -> r.getValue(columnFamilyBytes, columnQualifier))
+              .filter(r -> r.containsColumn(columnFamily, columnQualifier))
+              .map(r -> r.getValue(columnFamily, columnQualifier))
               .forEach(val -> values.add(Serializer.fromBytes(val, clazz)));
 
-      return values;
-
     } catch(IOException e) {
       throw new RuntimeException(e);
     }
+
+    return values;
   }
 
   public void setTable(HTableInterface table) {
@@ -114,4 +128,4 @@ public class HBaseProfilerClient implements ProfilerClient {
   public void setColumnBuilder(ColumnBuilder columnBuilder) {
     this.columnBuilder = columnBuilder;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
new file mode 100644
index 0000000..ffe9470
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
@@ -0,0 +1,269 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.common.dsl.Stellar;
+import org.apache.metron.common.dsl.StellarFunction;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.profiler.client.HBaseProfilerClient;
+import org.apache.metron.profiler.client.ProfilerClient;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.lang.String.format;
+import static org.apache.metron.common.dsl.Context.Capabilities.GLOBAL_CONFIG;
+
+/**
+ * A Stellar function that can retrieve data contained within a Profile.
+ *
+ *  PROFILE_GET
+ *
+ * Retrieve all values for 'entity1' from 'profile1' over the past 4 hours.
+ *
+ *   <code>PROFILE_GET('profile1', 'entity1', 4, 'HOURS')</code>
+ *
+ * Retrieve all values for 'entity1' from 'profile1' over the past 2 days.
+ *
+ *   <code>PROFILE_GET('profile1', 'entity1', 2, 'DAYS')</code>
+ *
+ * Retrieve all values for 'entity1' from 'profile1' that occurred on 'weekdays' over the past month.
+ *
+ *   <code>PROFILE_GET('profile1', 'entity1', 1, 'MONTHS', 'weekdays')</code>
+ *
+ */
+@Stellar(
+        namespace="PROFILE",
+        name="GET",
+        description="Retrieves a series of values from a stored profile.",
+        params={
+          "profile - The name of the profile.",
+          "entity - The name of the entity.",
+          "durationAgo - How long ago should values be retrieved from?",
+          "units - The units of 'durationAgo'.",
+          "groups - Optional - The groups used to sort the profile."
+        },
+        returns="The profile measurements."
+)
+public class GetProfile implements StellarFunction {
+
+  /**
+   * A global property that defines the name of the HBase table storing profile definitions.
+   */
+  public static final String PROFILER_HBASE_TABLE = "profiler.hbase.table";
+
+  /**
+   * A global property that defines the name of the column family used to store profile data.
+   */
+  public static final String PROFILER_COLUMN_FAMILY = "profiler.column.family";
+
+  /**
+   * A global property that defines the name of the HBaseTableProvider implementation class.
+   */
+  public static final String PROFILER_HBASE_TABLE_PROVIDER = "profiler.hbase.table.provider";
+
+  /**
+   * A client that can retrieve profile values.
+   */
+  private ProfilerClient client;
+
+  /**
+   * Initialization.
+   */
+  @Override
+  public void initialize(Context context) {
+
+    // ensure the required capabilities are defined
+    Context.Capabilities[] required = { GLOBAL_CONFIG };
+    validateCapabilities(context, required);
+    Map<String, Object> global = (Map<String, Object>) context.getCapability(GLOBAL_CONFIG).get();
+
+    // create the profiler client
+    ColumnBuilder columnBuilder = getColumnBuilder(global);
+    RowKeyBuilder rowKeyBuilder = getRowKeyBuilder(global);
+    HTableInterface table = getTable(global);
+    client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder);
+  }
+
+  /**
+   * Is the function initialized?
+   */
+  @Override
+  public boolean isInitialized() {
+    return client != null;
+  }
+
+  /**
+   * Apply the function.
+   * @param args The function arguments.
+   * @param context
+   */
+  @Override
+  public Object apply(List<Object> args, Context context) throws ParseException {
+
+    String profile = getArg(0, String.class, args);
+    String entity = getArg(1, String.class, args);
+    long durationAgo = getArg(2, Long.class, args);
+    String unitsName = getArg(3, String.class, args);
+    TimeUnit units = TimeUnit.valueOf(unitsName);
+    List<Object> groups = getGroupsArg(4, args);
+
+    return client.fetch(profile, entity, durationAgo, units, Integer.class, groups);
+  }
+
+  /**
+   * Get the groups defined by the user.
+   *
+   * The user can specify 0 or more groups.  All arguments from the specified position
+   * on are assumed to be groups.  If there is no argument in the specified position,
+   * then it is assumed the user did not specify any groups.
+   *
+   * @param startIndex The starting index of groups within the function argument list.
+   * @param args The function arguments.
+   * @return The groups.
+   */
+  private List<Object> getGroupsArg(int startIndex, List<Object> args) {
+    List<Object> groups = new ArrayList<>();
+
+    for(int i=startIndex; i<args.size(); i++) {
+      String group = getArg(i, String.class, args);
+      groups.add(group);
+    }
+
+    return groups;
+  }
+
+  /**
+   * Ensure that the required capabilities are defined.
+   * @param context The context to validate.
+   * @param required The required capabilities.
+   * @throws IllegalStateException if all of the required capabilities are not present in the Context.
+   */
+  private void validateCapabilities(Context context, Context.Capabilities[] required) throws IllegalStateException {
+
+    // collect the name of each missing capability
+    String missing = Stream
+            .of(required)
+            .filter(c -> !context.getCapability(c).isPresent())
+            .map(c -> c.toString())
+            .collect(Collectors.joining(", "));
+
+    if(StringUtils.isNotBlank(missing) || context == null) {
+      throw new IllegalStateException("missing required context: " + missing);
+    }
+  }
+
+  /**
+   * Get an argument from a list of arguments.
+   * @param index The index within the list of arguments.
+   * @param clazz The type expected.
+   * @param args All of the arguments.
+   * @param <T> The type of the argument expected.
+   */
+  private <T> T getArg(int index, Class<T> clazz, List<Object> args) {
+    if(index >= args.size()) {
+      throw new IllegalArgumentException(format("expected at least %d argument(s), found %d", index+1, args.size()));
+    }
+
+    return ConversionUtils.convert(args.get(index), clazz);
+  }
+
+  /**
+   * Creates the ColumnBuilder to use in accessing the profile data.
+   * @param global The global configuration.
+   */
+  private ColumnBuilder getColumnBuilder(Map<String, Object> global) {
+    // the builder is not currently configurable - but should be made so
+    ColumnBuilder columnBuilder;
+
+    if(global.containsKey(PROFILER_COLUMN_FAMILY)) {
+      String columnFamily = (String) global.get(PROFILER_COLUMN_FAMILY);
+      columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+
+    } else {
+      columnBuilder = new ValueOnlyColumnBuilder();
+    }
+
+    return columnBuilder;
+  }
+
+  /**
+   * Creates the ColumnBuilder to use in accessing the profile data.
+   * @param global The global configuration.
+   */
+  private RowKeyBuilder getRowKeyBuilder(Map<String, Object> global) {
+    // the builder is not currently configurable - but should be made so
+    return new SaltyRowKeyBuilder();
+  }
+
+  /**
+   * Create an HBase table used when accessing HBase.
+   * @param global The global configuration.
+   * @return
+   */
+  private HTableInterface getTable(Map<String, Object> global) {
+
+    String tableName = (String) global.getOrDefault(PROFILER_HBASE_TABLE, "profiler");
+    TableProvider provider = getTableProvider(global);
+
+    try {
+      return provider.getTable(HBaseConfiguration.create(), tableName);
+
+    } catch (IOException e) {
+      throw new IllegalArgumentException(String.format("Unable to access table: %s", tableName));
+    }
+  }
+
+  /**
+   * Create the TableProvider to use when accessing HBase.
+   * @param global The global configuration.
+   */
+  private TableProvider getTableProvider(Map<String, Object> global) {
+    String clazzName = (String) global.getOrDefault(PROFILER_HBASE_TABLE_PROVIDER, HTableProvider.class.getName());
+
+    TableProvider provider;
+    try {
+      Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(clazzName);
+      provider = clazz.newInstance();
+
+    } catch (Exception e) {
+      provider = new HTableProvider();
+    }
+
+    return provider;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
new file mode 100644
index 0000000..e68d52d
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
@@ -0,0 +1,234 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.FunctionResolverSingleton;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
+import org.apache.metron.profiler.stellar.StellarExecutor;
+import org.apache.metron.test.mock.MockHTable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.client.stellar.GetProfile.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.GetProfile.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.GetProfile.PROFILER_HBASE_TABLE_PROVIDER;
+
+/**
+ * Tests the GetProfile class.
+ */
+public class GetProfileTest {
+
+  private static final String tableName = "profiler";
+  private static final String columnFamily = "P";
+  private StellarExecutor executor;
+  private Map<String, Object> state;
+  private ProfileWriter profileWriter;
+
+  /**
+   * A TableProvider that allows us to mock HBase.
+   */
+  public static class MockTableProvider implements TableProvider, Serializable {
+
+    MockHTable.Provider provider = new MockHTable.Provider();
+
+    @Override
+    public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+      return provider.getTable(config, tableName);
+    }
+  }
+
+  private <T> T run(String expression, Class<T> clazz) {
+    return executor.execute(expression, state, clazz);
+  }
+
+  @Before
+  public void setup() {
+    state = new HashMap<>();
+    final HTableInterface table = MockHTable.Provider.addToCache(tableName, columnFamily);
+
+    // used to write values to be read during testing
+    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
+    ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+    profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
+
+    // global properties
+    Map<String, Object> global = new HashMap<String, Object>() {{
+      put(PROFILER_HBASE_TABLE, tableName);
+      put(PROFILER_COLUMN_FAMILY, columnFamily);
+      put(PROFILER_HBASE_TABLE_PROVIDER, MockTableProvider.class.getName());
+    }};
+
+    // create the necessary context
+    Context context = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+            .build();
+
+    // initialize the executor with that context
+    executor = new DefaultStellarExecutor();
+    executor.setContext(context);
+
+    // force re-initialization before each test
+    FunctionResolverSingleton.getInstance().reset();
+  }
+
+  /**
+   * Values should be retrievable that have NOT been stored within a group.
+   */
+  @Test
+  public void testWithNoGroups() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Collections.emptyList();
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // execute - read the profile values - no groups
+    String expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS')";
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+  }
+
+  /**
+   * Values should be retrievable that have been stored within a 'group'.
+   */
+  @Test
+  public void testWithOneGroup() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Arrays.asList("weekends");
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // create a variable that contains the groups to use
+    state.put("groups", group);
+
+    // execute - read the profile values
+    String expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS', 'weekends')";
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+  }
+
+  /**
+   * Values should be retrievable that have been stored within a 'group'.
+   */
+  @Test
+  public void testWithTwoGroups() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Arrays.asList("weekdays", "tuesday");
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // create a variable that contains the groups to use
+    state.put("groups", group);
+
+    // execute - read the profile values
+    String expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS', 'weekdays', 'tuesday')";
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+  }
+
+  /**
+   * Initialization should fail if the required context values are missing.
+   */
+  @Test(expected = ParseException.class)
+  public void testMissingContext() {
+    Context empty = Context.EMPTY_CONTEXT();
+
+    // 'unset' the context that was created during setup()
+    executor.setContext(empty);
+
+    // force re-initialization with no context
+    FunctionResolverSingleton.getInstance().initialize(empty);
+
+    // validate - function should be unable to initialize
+    String expr = "PROFILE_GET('profile1', 'entity1', 1000, 'SECONDS', groups)";
+    run(expr, List.class);
+  }
+
+  /**
+   * If the time horizon specified does not include any profile measurements, then
+   * none should be returned.
+   */
+  @Test
+  public void testOutsideTimeHorizon() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Collections.emptyList();
+
+    // setup - write a single value from 2 hours ago
+    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+    profileWriter.write(m, 1, group, val -> expectedValue);
+
+    // create a variable that contains the groups to use
+    state.put("groups", group);
+
+    // execute - read the profile values
+    String expr = "PROFILE_GET('profile1', 'entity1', 4, 'SECONDS')";
+    List<Integer> result = run(expr, List.class);
+
+    // validate - there should be no values from only 4 seconds ago
+    Assert.assertEquals(0, result.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
index 7e13f3b..087bffa 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
@@ -23,20 +23,15 @@ package org.apache.metron.profiler.client;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.hbase.client.HBaseClient;
 import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.ProfilePeriod;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
 import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
 import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
 import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
 import org.apache.metron.profiler.stellar.StellarExecutor;
-import org.apache.storm.hbase.common.ColumnList;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -52,10 +47,11 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests the HBaseProfilerClient.
  *
- * The naming used in this test attempts to be as similar to how the 'groupBy' functionality might be used 'in
- * the wild'.  This test involves reading and writing two separate groups originating from the same Profile and
- * Entity.  There is a 'weekdays' group which contains all measurements taken on weekdays.  There is also a
- * 'weekend' group which contains all measurements taken on weekends.
+ * The naming used in this test attempts to be as similar to how the 'groupBy'
+ * functionality might be used 'in the wild'.  This test involves reading and
+ * writing two separate groups originating from the same Profile and Entity.
+ * There is a 'weekdays' group which contains all measurements taken on weekdays.
+ * There is also a 'weekend' group which contains all measurements taken on weekends.
  */
 public class HBaseProfilerClientTest {
 
@@ -63,12 +59,10 @@ public class HBaseProfilerClientTest {
   private static final String columnFamily = "P";
 
   private HBaseProfilerClient client;
-  private HBaseClient hbaseClient;
-  private RowKeyBuilder rowKeyBuilder;
-  private ColumnBuilder columnBuilder;
   private HTableInterface table;
   private StellarExecutor executor;
   private static HBaseTestingUtility util;
+  private ProfileWriter profileWriter;
 
   @BeforeClass
   public static void startHBase() throws Exception {
@@ -88,70 +82,28 @@ public class HBaseProfilerClientTest {
   @Before
   public void setup() throws Exception {
 
-    // setup all of the necessary dependencies
     table = util.createTable(Bytes.toBytes(tableName), Bytes.toBytes(columnFamily));
-    hbaseClient = new HBaseClient((c,t) -> table, table.getConfiguration(), tableName);
     executor = new DefaultStellarExecutor();
-    rowKeyBuilder = new SaltyRowKeyBuilder();
-    columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+
+    // used to write values to be read during testing
+    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
+    ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+    profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
 
     // what we're actually testing
     client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder);
   }
 
-  /**
-   * Writes profile measurements that can be used for testing.
-   * @param count The number of profile measurements to write.
-   * @param profileName Name of the profile.
-   * @param entityName Name of the entity.
-   * @param value The measurement value.
-   * @param periodsPerHour Number of profile periods per hour.
-   * @param startTime When measurements should start to be written.
-   * @param group The name of the group.
-   */
-  private void writeMeasurements(int count, String profileName, String entityName, int value, int periodsPerHour, long startTime, List<Object> group) {
-
-    // create the first measurement
-    ProfileMeasurement m = new ProfileMeasurement(profileName, entityName, startTime, periodsPerHour);
-    m.setValue(value);
-
-    for(int i=0; i<count; i++) {
-
-      // create a measurement for the next profile period
-      ProfilePeriod next = m.getPeriod().next();
-      m = new ProfileMeasurement(profileName, entityName, next.getTimeInMillis(), periodsPerHour);
-      m.setValue(value);
-
-      // write the measurement
-      write(m, group);
-    }
-  }
-
   @After
   public void tearDown() throws Exception {
     util.deleteTable(tableName);
   }
 
   /**
-   * Write a ProfileMeasurement.
-   * @param m The ProfileMeasurement to write.
-   * @param groups The groups to use when writing the ProfileMeasurement.
-   */
-  private void write(ProfileMeasurement m, List<Object> groups) {
-
-    byte[] rowKey = rowKeyBuilder.rowKey(m, groups);
-    ColumnList cols = columnBuilder.columns(m);
-
-    List<Mutation> mutations = hbaseClient.constructMutationReq(rowKey, cols, Durability.SKIP_WAL);
-    hbaseClient.batchMutate(mutations);
-  }
-
-  /**
    * The client should be able to distinguish between groups and only fetch those in the correct group.
    */
   @Test
   public void testFetchOneGroup() throws Exception {
-
     final int periodsPerHour = 4;
     final int expectedValue = 2302;
     final int hours = 2;
@@ -159,8 +111,9 @@ public class HBaseProfilerClientTest {
     final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
 
     // setup - write two groups of measurements - 'weekends' and 'weekdays'
-    writeMeasurements(count, "profile1", "entity1", expectedValue, periodsPerHour, startTime, Arrays.asList("weekdays"));
-    writeMeasurements(count, "profile1", "entity1", 0, periodsPerHour, startTime, Arrays.asList("weekends"));
+    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+    profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
+    profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
 
     // execute
     List<Integer> results = client.fetch("profile1", "entity1", hours, TimeUnit.HOURS, Integer.class, Arrays.asList("weekdays"));
@@ -184,11 +137,13 @@ public class HBaseProfilerClientTest {
     final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
 
     // create two groups of measurements - one on weekdays and one on weekends
-    writeMeasurements(count, "profile1", "entity1", expectedValue, periodsPerHour, startTime, Arrays.asList("weekdays"));
-    writeMeasurements(count, "profile1", "entity1", 0, periodsPerHour, startTime, Arrays.asList("weekends"));
+    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+    profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
+    profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
 
     // execute
-    List<Integer> results = client.fetch("profile1", "entity1", hours, TimeUnit.HOURS, Integer.class, Arrays.asList("does-not-exist"));
+    List<Object> doesNotExist = Arrays.asList("does-not-exist");
+    List<Integer> results = client.fetch("profile1", "entity1", hours, TimeUnit.HOURS, Integer.class, doesNotExist);
 
     // validate
     assertEquals(0, results.size());
@@ -200,13 +155,15 @@ public class HBaseProfilerClientTest {
    */
   @Test
   public void testFetchOutsideTimeWindow() throws Exception {
-
-    // setup - create some measurement values from a day ago
     final int periodsPerHour = 4;
     final int hours = 2;
+    int numberToWrite = hours * periodsPerHour;
     final List<Object> group = Arrays.asList("weekends");
     final long startTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
-    writeMeasurements(hours * periodsPerHour, "profile1", "entity1", 1000, 4, startTime, group);
+
+    // setup - write some values to read later
+    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+    profileWriter.write(m, numberToWrite, group, val -> 1000);
 
     // execute
     List<Integer> results = client.fetch("profile1", "entity1", 2, TimeUnit.MILLISECONDS, Integer.class, group);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
new file mode 100644
index 0000000..95be44c
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -0,0 +1,97 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client;
+
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.ProfilePeriod;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.storm.hbase.common.ColumnList;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Writes ProfileMeasurement values that can be read during automated testing.
+ */
+public class ProfileWriter {
+
+  private RowKeyBuilder rowKeyBuilder;
+  private ColumnBuilder columnBuilder;
+  private HBaseClient hbaseClient;
+  private HBaseProfilerClient client;
+
+  public ProfileWriter(RowKeyBuilder rowKeyBuilder, ColumnBuilder columnBuilder, HTableInterface table) {
+    this.rowKeyBuilder = rowKeyBuilder;
+    this.columnBuilder = columnBuilder;
+    this.hbaseClient = new HBaseClient((c, t) -> table, table.getConfiguration(), table.getName().getNameAsString());
+    this.client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder);
+  }
+
+  /**
+   * Writes profile measurements that can be used for testing.
+   *
+   * @param prototype      A prototype for the types of ProfileMeasurements that should be written.
+   * @param count          The number of profile measurements to write.
+   * @param group          The name of the group.
+   * @param valueGenerator A function that consumes the previous ProfileMeasurement value and produces the next.
+   */
+  public void write(ProfileMeasurement prototype, int count, List<Object> group, Function<Object, Object> valueGenerator) {
+
+    ProfileMeasurement m = prototype;
+    for(int i=0; i<count; i++) {
+
+      // create a measurement for the next profile period to be written
+      ProfilePeriod next = m.getPeriod().next();
+      m = new ProfileMeasurement(
+              prototype.getProfileName(),
+              prototype.getEntity(),
+              next.getTimeInMillis(),
+              prototype.getPeriodsPerHour());
+
+      // generate the next value that should be written
+      Object nextValue = valueGenerator.apply(m.getValue());
+      m.setValue(nextValue);
+
+      // write the measurement
+      write(m, group);
+    }
+  }
+
+  /**
+   * Write a ProfileMeasurement.
+   * @param m The ProfileMeasurement to write.
+   * @param groups The groups to use when writing the ProfileMeasurement.
+   */
+  private void write(ProfileMeasurement m, List<Object> groups) {
+
+    byte[] rowKey = rowKeyBuilder.rowKey(m, groups);
+    ColumnList cols = columnBuilder.columns(m);
+
+    List<Mutation> mutations = hbaseClient.constructMutationReq(rowKey, cols, Durability.SKIP_WAL);
+    hbaseClient.batchMutate(mutations);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index bea0b34..0c94879 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -51,6 +51,11 @@ public class ProfileMeasurement {
   private List<String> groupBy;
 
   /**
+   * The number of profile periods per hour.
+   */
+  private int periodsPerHour;
+
+  /**
    * The period in which the ProfileMeasurement was taken.
    */
   private ProfilePeriod period;
@@ -64,6 +69,7 @@ public class ProfileMeasurement {
   public ProfileMeasurement(String profileName, String entity, long epochMillis, int periodsPerHour) {
     this.profileName = profileName;
     this.entity = entity;
+    this.periodsPerHour = periodsPerHour;
     this.period = new ProfilePeriod(epochMillis, periodsPerHour);
   }
 
@@ -95,13 +101,17 @@ public class ProfileMeasurement {
     this.groupBy = groupBy;
   }
 
+  public int getPeriodsPerHour() {
+    return periodsPerHour;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;
 
     ProfileMeasurement that = (ProfileMeasurement) o;
-
+    if (periodsPerHour != that.periodsPerHour) return false;
     if (profileName != null ? !profileName.equals(that.profileName) : that.profileName != null) return false;
     if (entity != null ? !entity.equals(that.entity) : that.entity != null) return false;
     if (value != null ? !value.equals(that.value) : that.value != null) return false;
@@ -116,6 +126,7 @@ public class ProfileMeasurement {
     result = 31 * result + (entity != null ? entity.hashCode() : 0);
     result = 31 * result + (value != null ? value.hashCode() : 0);
     result = 31 * result + (groupBy != null ? groupBy.hashCode() : 0);
+    result = 31 * result + periodsPerHour;
     result = 31 * result + (period != null ? period.hashCode() : 0);
     return result;
   }
@@ -127,6 +138,7 @@ public class ProfileMeasurement {
             ", entity='" + entity + '\'' +
             ", value=" + value +
             ", groupBy=" + groupBy +
+            ", periodsPerHour=" + periodsPerHour +
             ", period=" + period +
             '}';
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 79c046a..2cd7182 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -112,6 +112,7 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   protected void initializeStellar() {
     Context context = new Context.Builder()
             .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
             .build();
     StellarFunctions.initialize(context);
     executor.setContext(context);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index d389410..5d97e4d 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -82,8 +82,9 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
 
   protected void initializeStellar() {
     Context context = new Context.Builder()
-                         .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
-                         .build();
+            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
+            .build();
     StellarFunctions.initialize(context);
     executor.setContext(context);
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
index ae786f2..474ac73 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
@@ -21,6 +21,7 @@
 package org.apache.metron.profiler.stellar;
 
 import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.FunctionResolver;
 import org.apache.metron.common.dsl.MapVariableResolver;
 import org.apache.metron.common.dsl.ParseException;
 import org.apache.metron.common.dsl.StellarFunctions;
@@ -44,7 +45,7 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
 
   /**
    * Provides additional context for initializing certain Stellar functions.  For
-   * example, references to find Zookeeper or HBase.
+   * example, references to Zookeeper or HBase.
    */
   private Context context;
 
@@ -61,41 +62,50 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
     this.state = new HashMap<>(initialState);
   }
 
+  /**
+   * The current state of the Stellar execution environment.
+   */
   @Override
   public Map<String, Object> getState() {
     return new HashMap<>(state);
   }
 
   /**
-   * Execute an expression and assign the result to a variable.
+   * Execute an expression and assign the result to a variable.  The variable is maintained
+   * in the context of this executor and is available to all subsequent expressions.
    *
-   * @param variable The variable name to assign to.
-   * @param expression The expression to execute.
-   * @param message The message that provides additional context for the expression.
+   * @param variable       The name of the variable to assign to.
+   * @param expression     The expression to execute.
+   * @param transientState Additional state available to the expression.  This most often represents
+   *                       the values available to the expression from an individual message. The state
+   *                       maps a variable name to a variable's value.
    */
   @Override
-  public void assign(String variable, String expression, Map<String, Object> message) {
-    Object result = execute(expression, message);
+  public void assign(String variable, String expression, Map<String, Object> transientState) {
+    Object result = execute(expression, transientState);
     state.put(variable, result);
   }
 
   /**
-   * Execute a Stellar expression and returns the result.
+   * Execute a Stellar expression and return the result.  The internal state of the executor
+   * is not modified.
    *
-   * @param expr The expression to execute.
-   * @param message The message that is accessible when Stellar is executed.
-   * @param clazz The expected class of the expression's result.
-   * @param <T> The expected class of the expression's result.
+   * @param expression The expression to execute.
+   * @param state      Additional state available to the expression.  This most often represents
+   *                   the values available to the expression from an individual message. The state
+   *                   maps a variable name to a variable's value.
+   * @param clazz      The expected type of the expression's result.
+   * @param <T>        The expected type of the expression's result.
    */
   @Override
-  public <T> T execute(String expr, Map<String, Object> message, Class<T> clazz) {
-    Object resultObject = execute(expr, message);
+  public <T> T execute(String expression, Map<String, Object> state, Class<T> clazz) {
+    Object resultObject = execute(expression, state);
 
     // perform type conversion, if necessary
     T result = ConversionUtils.convert(resultObject, clazz);
-    if(result == null) {
+    if (result == null) {
       throw new IllegalArgumentException(String.format("Unexpected type: expected=%s, actual=%s, expression=%s",
-              clazz.getSimpleName(), resultObject.getClass().getSimpleName(), expr));
+              clazz.getSimpleName(), resultObject.getClass().getSimpleName(), expression));
     }
 
     return result;
@@ -109,6 +119,7 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
   /**
    * Sets the Context for the Stellar execution environment.  This provides global data used
    * to initialize Stellar functions.
+   *
    * @param context The Stellar context.
    */
   @Override
@@ -119,17 +130,15 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
   /**
    * Execute a Stellar expression.
    *
-   * @param expr The expression to execute.
-   * @param msg The message that is accessible when Stellar is executed.
+   * @param expression     The expression to execute.
+   * @param transientState Additional state available to the expression.  This most often represents
+   *                       the values available to the expression from an individual message. The state
+   *                       maps a variable name to a variable's value.
    */
-  private Object execute(String expr, Map<String, Object> msg) {
-    try {
-      VariableResolver resolver = new MapVariableResolver(state, msg);
-      StellarProcessor processor = new StellarProcessor();
-      return processor.parse(expr, resolver, StellarFunctions.FUNCTION_RESOLVER(), context);
-
-    } catch (ParseException e) {
-      throw new ParseException(String.format("Bad expression: expr=%s, msg=%s, state=%s", expr, msg, state));
-    }
+  private Object execute(String expression, Map<String, Object> transientState) {
+    FunctionResolver functionResolver = StellarFunctions.FUNCTION_RESOLVER();
+    VariableResolver variableResolver = new MapVariableResolver(state, transientState);
+    StellarProcessor processor = new StellarProcessor();
+    return processor.parse(expression, variableResolver, functionResolver, context);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
index 66e3ad1..869db42 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
@@ -21,37 +21,38 @@
 package org.apache.metron.profiler.stellar;
 
 import org.apache.metron.common.dsl.Context;
-import org.json.simple.JSONObject;
 
 import java.util.Map;
 
 /**
  * Executes Stellar expressions and maintains state across multiple invocations.
- *
- * There are two sets of functions in Stellar currently.  One can be executed with
- * a PredicateProcessor and the other a TransformationProcessor.  This interface
- * abstracts away that complication.
  */
 public interface StellarExecutor {
 
   /**
-   * Execute an expression and assign the result to a variable.
+   * Execute an expression and assign the result to a variable.  The variable is maintained
+   * in the context of this executor and is available to all subsequent expressions.
    *
-   * @param variable The name of the variable to assign to.
+   * @param variable   The name of the variable to assign to.
    * @param expression The expression to execute.
-   * @param message The message that provides additional context for the expression.
+   * @param state      Additional state available to the expression.  This most often represents
+   *                   the values available to the expression from an individual message. The state
+   *                   maps a variable name to a variable's value.
    */
-  void assign(String variable, String expression, Map<String, Object> message);
+  void assign(String variable, String expression, Map<String, Object> state);
 
   /**
-   * Execute a Stellar expression and return the result.
+   * Execute a Stellar expression and return the result.  The internal state of the executor
+   * is not modified.
    *
    * @param expression The expression to execute.
-   * @param message A map of values, most often the JSON message itself, that is accessible within Stellar.
-   * @param clazz The expected class of the expression's result.
-   * @param <T> The expected class of the expression's result.
+   * @param state      Additional state available to the expression.  This most often represents
+   *                   the values available to the expression from an individual message. The state
+   *                   maps a variable name to a variable's value.
+   * @param clazz      The expected type of the expression's result.
+   * @param <T>        The expected type of the expression's result.
    */
-  <T> T execute(String expression, Map<String, Object> message, Class<T> clazz);
+  <T> T execute(String expression, Map<String, Object> state, Class<T> clazz);
 
   /**
    * The current state of the Stellar execution environment.
@@ -66,6 +67,7 @@ public interface StellarExecutor {
   /**
    * Sets the Context for the Stellar execution environment.  This provides global data used
    * to initialize Stellar functions.
+   *
    * @param context The Stellar context.
    */
   void setContext(Context context);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
index 7a5cbcf..9e2a66a 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
@@ -28,6 +28,11 @@ import org.json.simple.parser.ParseException;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertEquals;
@@ -158,4 +163,18 @@ public class DefaultStellarExecutorTest {
     executor.execute("2", message, Short.class);
     executor.execute("2", message, Long.class);
   }
+
+  /**
+   * The executor must be serializable.
+   */
+  @Test
+  public void testSerializable() throws Exception {
+
+    // serialize
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    new ObjectOutputStream(bytes).writeObject(executor);
+
+    // deserialize - success when no exceptions
+    new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
index 0731b00..aa8c9a2 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
@@ -31,7 +31,8 @@ public class Context implements Serializable {
   public enum Capabilities {
     HBASE_PROVIDER,
     ZOOKEEPER_CLIENT,
-    SERVICE_DISCOVERER
+    SERVICE_DISCOVERER,
+    GLOBAL_CONFIG
   }
 
   public static class Builder {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolver.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolver.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolver.java
index 0e90b84..03fdc5f 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolver.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolver.java
@@ -19,8 +19,31 @@ package org.apache.metron.common.dsl;
 
 import java.util.function.Function;
 
+/**
+ * Responsible for function resolution in Stellar.
+ */
 public interface FunctionResolver extends Function<String, StellarFunction> {
+
+  /**
+   * Provides metadata about each Stellar function that is resolvable.
+   */
   Iterable<StellarFunctionInfo> getFunctionInfo();
+
+  /**
+   * The names of all Stellar functions that are resolvable.
+   */
   Iterable<String> getFunctions();
+
+  /**
+   * Initialize the function resolver.
+   * @param context Context used to initialize.
+   */
   void initialize(Context context);
+
+  /**
+   * A 'factory reset' of the function resolver.
+   *
+   * Useful primarily for testing purposes only.
+   */
+  void reset();
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolverSingleton.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolverSingleton.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolverSingleton.java
index c631878..ed57db0 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolverSingleton.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolverSingleton.java
@@ -43,18 +43,26 @@ public class FunctionResolverSingleton implements FunctionResolver {
     return INSTANCE;
   }
 
-
-
+  /**
+   * Provides metadata about each Stellar function that is resolvable.
+   */
   @Override
   public Iterable<StellarFunctionInfo> getFunctionInfo() {
     return _getFunctions().values();
   }
 
+  /**
+   * The names of all Stellar functions that are resolvable.
+   */
   @Override
   public Iterable<String> getFunctions() {
     return _getFunctions().keySet();
   }
 
+  /**
+   * Initialize the function resolver.
+   * @param context Context used to initialize.
+   */
   @Override
   public void initialize(Context context) {
     //forces a load of the stellar functions.
@@ -62,6 +70,22 @@ public class FunctionResolverSingleton implements FunctionResolver {
   }
 
   /**
+   * A 'factory reset' of the function resolver.
+   *
+   * Useful primarily for testing purposes only.
+   */
+  @Override
+  public void reset() {
+    lock.writeLock().lock();
+    try {
+      isInitialized.set(false);
+    }
+    finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
    * This allows the lazy loading of the functions.  We do not want to take a multi-second hit to analyze the full classpath
    * every time a unit test starts up.  That would cause the runtime of things to blow right up.  Instead, we only want
    * to take the hit if a function is actually called from a stellar expression.
@@ -148,8 +172,6 @@ public class FunctionResolverSingleton implements FunctionResolver {
     return ClasspathHelper.forManifest(ClasspathHelper.forClassLoader(classLoaders));
   }
 
-
-
   private static Map.Entry<String, StellarFunctionInfo> create(Class<? extends StellarFunction> stellarClazz) {
     String fqn = getNameFromAnnotation(stellarClazz);
     if(fqn == null) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/StellarFunctionInfo.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/StellarFunctionInfo.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/StellarFunctionInfo.java
index 8c9745f..e0065e8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/StellarFunctionInfo.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/StellarFunctionInfo.java
@@ -20,12 +20,36 @@ package org.apache.metron.common.dsl;
 import java.util.Arrays;
 import java.util.List;
 
+/**
+ * Describes a Stellar function.
+ */
 public class StellarFunctionInfo {
-  String description;
+
+  /**
+   * The name of the function.
+   */
   String name;
+
+  /**
+   * A description of the function.  Used for documentation purposes.
+   */
+  String description;
+
+  /**
+   * A description of what the function returns.  Used for documentation purposes.
+   */
+  String returns;
+
+  /**
+   * The function parameters.  Used for documentation purposes.
+   */
   String[] params;
+
+  /**
+   * The actual function that can be executed.
+   */
   StellarFunction function;
-  String returns;
+
   public StellarFunctionInfo(String description, String name, String[] params, String returns, StellarFunction function) {
     this.description = description;
     this.name = name;
@@ -34,7 +58,9 @@ public class StellarFunctionInfo {
     this.returns = returns;
   }
 
-  public String getReturns() { return returns;}
+  public String getReturns() {
+    return returns;
+  }
 
   public String getDescription() {
     return description;
@@ -65,7 +91,6 @@ public class StellarFunctionInfo {
     // Probably incorrect - comparing Object[] arrays with Arrays.equals
     if (!Arrays.equals(getParams(), that.getParams())) return false;
     return getReturns() != null ? getReturns().equals(that.getReturns()) : that.getReturns() == null;
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
index d294da7..266badf 100644
--- a/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
+++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
@@ -3,14 +3,20 @@
   "es.ip": "localhost",
   "es.port": 9300,
   "es.date.format": "yyyy.MM.dd.HH",
+
   "solr.zookeeper": "localhost:2181",
   "solr.collection": "metron",
   "solr.numShards": 1,
   "solr.replicationFactor": 1,
+
   "fieldValidations" : [
     {
-       "input" : [ "src_ip_addr", "dst_ip_addr"]
-      ,"validation" : "IP"
+      "input" : [ "src_ip_addr", "dst_ip_addr"],
+      "validation" : "IP"
     }
-                       ]
+  ],
+
+  "profiler.hbase.table": "profiler",
+  "profiler.column.family": "P",
+  "profiler.hbase.table.provider": "org.apache.metron.hbase.HTableProvider"
 }
\ No newline at end of file


Mime
View raw message