metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject [1/2] incubator-metron git commit: METRON-389 Create Java API to Read Profile Data During Model Scoring (nickwallen) closes apache/incubator-metron#236
Date Mon, 12 Sep 2016 15:19:22 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master 2e5538c4e -> 3dfd7be6f


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
new file mode 100644
index 0000000..e78a8ea
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
@@ -0,0 +1,211 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.hbase;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.ProfilePeriod;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A RowKeyBuilder that uses a salt to prevent hot-spotting.
+ *
+ * Responsible for building the row keys used to store profile data in HBase.  The row key is composed of the following
+ * fields in the given order.
+ * <ul>
+ * <li>salt - A salt that helps prevent hot-spotting.
+ * <li>profile - The name of the profile.
+ * <li>entity - The name of the entity being profiled.
+ * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
+ * <li>year - The year based on UTC.
+ * <li>day of year - The current day within the year based on UTC; [1, 366]
+ * <li>hour - The hour within the day based on UTC; [0, 23]
+ * </ul>period - The period within the hour.  The number of periods per hour can be defined by the user; defaults to 4.
+ */
+public class SaltyRowKeyBuilder implements RowKeyBuilder {
+
+  /**
+   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
+   * divisor is used to generate the salt.  The salt divisor should be roughly equal
+   * to the number of nodes in the Hbase cluster.
+   */
+  private int saltDivisor;
+
+  /**
+   * An hour is divided into multiple periods.  This defines how many periods
+   * will exist within each given hour.
+   */
+  private int periodsPerHour;
+
+  public SaltyRowKeyBuilder() {
+    this.saltDivisor = 1000;
+    this.periodsPerHour = 4;
+  }
+
+  public SaltyRowKeyBuilder(int saltDivisor, int periodsPerHour) {
+    this.saltDivisor = saltDivisor;
+    this.periodsPerHour = periodsPerHour;
+  }
+
+  /**
+   * Builds a list of row keys necessary to retrieve profile measurements over
+   * a time horizon.
+   *
+   * @param profile The name of the profile.
+   * @param entity The name of the entity.
+   * @param groups The group(s) used to sort the profile data.
+   * @param durationAgo How long ago?
+   * @param unit The time units of how long ago.
+   * @return All of the row keys necessary to retrieve the profile measurements.
+   */
+  @Override
+  public List<byte[]> rowKeys(String profile, String entity, List<Object> groups, long durationAgo, TimeUnit unit) {
+    List<byte[]> rowKeys = new ArrayList<>();
+
+    // find the time horizon
+    long endTime = System.currentTimeMillis();
+    long startTime = endTime - unit.toMillis(durationAgo);
+
+    // find the starting period and advance until the end time is reached
+    ProfilePeriod period = new ProfilePeriod(startTime, periodsPerHour);
+    while(period.getTimeInMillis() <= endTime) {
+
+      byte[] k = rowKey(profile, entity, period, groups);
+      rowKeys.add(k);
+
+      // advance to the next period
+      period = period.next();
+    }
+
+    return rowKeys;
+  }
+
+  /**
+   * Builds the row key for a given profile measurement.
+   * @param m The profile measurement.
+   * @param groups The groups used to sort the profile data.
+   * @return The HBase row key.
+   */
+  @Override
+  public byte[] rowKey(ProfileMeasurement m, List<Object> groups) {
+    return rowKey(m.getProfileName(), m.getEntity(), m.getPeriod(), groups);
+  }
+
+  /**
+   * Build the row key.
+   * @param profile The name of the profile.
+   * @param entity The name of the entity.
+   * @param period The period in which the measurement was taken.
+   * @param groups The groups.
+   * @return The HBase row key.
+   */
+  public byte[] rowKey(String profile, String entity, ProfilePeriod period, List<Object> groups) {
+
+    // row key = salt + prefix + group(s) + time
+    byte[] salt = getSalt(period, saltDivisor);
+    byte[] prefixKey = prefixKey(profile, entity);
+    byte[] groupKey = groupKey(groups);
+    byte[] timeKey = timeKey(period);
+
+    int capacity = salt.length + prefixKey.length + groupKey.length + timeKey.length;
+    return ByteBuffer
+            .allocate(capacity)
+            .put(salt)
+            .put(prefixKey)
+            .put(groupKey)
+            .put(timeKey)
+            .array();
+  }
+
+  /**
+   * Builds the 'prefix' component of the row key.
+   * @param profile The name of the profile.
+   * @param entity The name of the entity.
+   */
+  private static byte[] prefixKey(String profile, String entity) {
+    return ByteBuffer
+            .allocate(profile.length() + entity.length())
+            .put(profile.getBytes())
+            .put(entity.getBytes())
+            .array();
+  }
+
+  /**
+   * Builds the 'group' component of the row key.
+   * @param groups The groups to include in the row key.
+   */
+  private static byte[] groupKey(List<Object> groups) {
+
+    StringBuilder builder = new StringBuilder();
+    groups.forEach(g -> builder.append(g));
+    String groupStr = builder.toString();
+
+    return ByteBuffer
+            .allocate(groupStr.length())
+            .put(groupStr.getBytes())
+            .array();
+  }
+
+  /**
+   * Builds the 'time' portion of the row key
+   * @param period The ProfilePeriod in which the ProfileMeasurement was taken.
+   */
+  private static byte[] timeKey(ProfilePeriod period) {
+    return ByteBuffer
+            .allocate(4 * Integer.BYTES)
+            .putInt(period.getYear())
+            .putInt(period.getDayOfYear())
+            .putInt(period.getHour())
+            .putInt(period.getPeriod())
+            .array();
+  }
+
+  /**
+   * Calculates a salt value that is used as part of the row key.
+   *
+   * The salt is calculated as 'md5(timestamp) % N' where N is a configurable value that ideally
+   * is close to the number of nodes in the Hbase cluster.
+   *
+   * @param period The period in which a profile measurement is taken.
+   */
+  public static byte[] getSalt(ProfilePeriod period, int saltDivisor) {
+    try {
+      MessageDigest digest = MessageDigest.getInstance("MD5");
+      byte[] hash = digest.digest(timeKey(period));
+      int salt = Bytes.toInt(hash) % saltDivisor;
+      return ByteBuffer
+              .allocate(Integer.BYTES)
+              .putInt(salt)
+              .array();
+
+    } catch(NoSuchAlgorithmException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static final double MINS_PER_HOUR = 60.0;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/Serializer.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/Serializer.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/Serializer.java
new file mode 100644
index 0000000..c0fe16f
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/Serializer.java
@@ -0,0 +1,92 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.hbase;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Provides basic functionality to serialize and deserialize the allowed
+ * value types for a ProfileMeasurement.
+ */
+public class Serializer {
+
+  private Serializer() {
+    // do not instantiate
+  }
+
+  /**
+   * Serialize a profile measurement's value.
+   *
+   * The value produced by a Profile definition can be any numeric data type.  The data
+   * type depends on how the profile is defined by the user.  The user should be able to
+   * choose the data type that is most suitable for their use case.
+   *
+   * @param value The value to serialize.
+   */
+  public static byte[] toBytes(Object value) {
+    byte[] result;
+
+    if(value instanceof Integer) {
+      result = Bytes.toBytes((Integer) value);
+    } else if(value instanceof Double) {
+      result = Bytes.toBytes((Double) value);
+    } else if(value instanceof Short) {
+      result = Bytes.toBytes((Short) value);
+    } else if(value instanceof Long) {
+      result = Bytes.toBytes((Long) value);
+    } else if(value instanceof Float) {
+      result = Bytes.toBytes((Float) value);
+    } else {
+      throw new RuntimeException("Expected 'Number': actual=" + value);
+    }
+
+    return result;
+  }
+
+  /**
+   * Deserialize a profile measurement's value.
+   *
+   * The value produced by a Profile definition can be any numeric data type.  The data
+   * type depends on how the profile is defined by the user.  The user should be able to
+   * choose the data type that is most suitable for their use case.
+   *
+   * @param value The value to deserialize.
+   */
+  public static <T> T fromBytes(byte[] value, Class<T> clazz) {
+    T result;
+
+    if(clazz == Integer.class) {
+      result = clazz.cast(Bytes.toInt(value));
+    } else if(clazz == Double.class) {
+      result = clazz.cast(Bytes.toDouble(value));
+    } else if(clazz == Short.class) {
+      result = clazz.cast(Bytes.toShort(value));
+    } else if(clazz == Long.class) {
+      result = clazz.cast(Bytes.toLong(value));
+    } else if(clazz == Float.class) {
+      result = clazz.cast(Bytes.toFloat(value));
+    } else {
+      throw new RuntimeException("Expected 'Number': actual=" + clazz);
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
new file mode 100644
index 0000000..aeda317
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
@@ -0,0 +1,75 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.hbase;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.hbase.common.ColumnList;
+
+/**
+ * A ColumnBuilder that writes only the value of a ProfileMeasurement.
+ */
+public class ValueOnlyColumnBuilder implements ColumnBuilder {
+
+  /**
+   * The column family storing the profile data.
+   */
+  private String columnFamily;
+
+  private byte[] columnFamilyBytes;
+
+  public ValueOnlyColumnBuilder() {
+    setColumnFamily("P");
+  }
+
+  public ValueOnlyColumnBuilder(String columnFamily) {
+    setColumnFamily(columnFamily);
+  }
+
+  @Override
+  public ColumnList columns(ProfileMeasurement measurement) {
+
+    ColumnList cols = new ColumnList();
+    cols.addColumn(columnFamilyBytes, getColumnQualifier("value"), Serializer.toBytes(measurement.getValue()));
+
+    return cols;
+  }
+
+  @Override
+  public String getColumnFamily() {
+    return this.columnFamily;
+  }
+
+  public void setColumnFamily(String columnFamily) {
+    this.columnFamily = columnFamily;
+    this.columnFamilyBytes = Bytes.toBytes(columnFamily);
+  }
+
+  @Override
+  public byte[] getColumnQualifier(String fieldName) {
+
+    if("value".equals(fieldName)) {
+      return Bytes.toBytes("value");
+    }
+
+    throw new IllegalArgumentException(("unexpected field name: " + fieldName));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
new file mode 100644
index 0000000..ada7365
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
@@ -0,0 +1,341 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the ProfilePeriod class.
+ */
+public class ProfilePeriodTest {
+
+  /**
+   * Thu, Aug 25 2016 09:27:10 EST
+   * Thu, Aug 25 2016 13:27:10 GMT
+   *
+   * 238th day of the year
+   */
+  private long AUG2016 = 1472131630748L;
+
+  /**
+   * The number of periods per hour must always ensure that the first period falls on the start of each hour.  This
+   * means that the number of periods must be a divisor or multiple of 60.
+   */
+  @Test(expected = RuntimeException.class)
+  public void testInvalidPeriodsPerHour() {
+    new ProfilePeriod(AUG2016, 241);
+  }
+
+  @Test
+  public void test1PeriodPerHour() {
+    ProfilePeriod period = new ProfilePeriod(AUG2016, 1);
+
+    assertEquals(2016, period.getYear());
+    assertEquals(238, period.getDayOfYear());
+    assertEquals(13, period.getHour());
+    assertEquals(0, period.getPeriod());
+  }
+
+  @Test
+  public void test2PeriodsPerHour() {
+    ProfilePeriod period = new ProfilePeriod(AUG2016, 2);
+
+    assertEquals(2016, period.getYear());
+    assertEquals(238, period.getDayOfYear());
+    assertEquals(13, period.getHour());
+    assertEquals(0, period.getPeriod());
+  }
+
+  @Test
+  public void test3PeriodsPerHour() {
+    ProfilePeriod period = new ProfilePeriod(AUG2016, 3);
+
+    assertEquals(2016, period.getYear());
+    assertEquals(238, period.getDayOfYear());
+    assertEquals(13, period.getHour());
+    assertEquals(1, period.getPeriod());
+  }
+
+  @Test
+  public void test4PeriodsPerHour() {
+    ProfilePeriod period = new ProfilePeriod(AUG2016, 4);
+
+    assertEquals(2016, period.getYear());
+    assertEquals(238, period.getDayOfYear());
+    assertEquals(13, period.getHour());
+    assertEquals(1, period.getPeriod());
+  }
+
+  @Test
+  public void test60PeriodsPerHour() {
+    ProfilePeriod period = new ProfilePeriod(AUG2016, 60);
+
+    assertEquals(2016, period.getYear());
+    assertEquals(238, period.getDayOfYear());
+    assertEquals(13, period.getHour());
+    assertEquals(27, period.getPeriod());
+  }
+
+  @Test
+  public void test240PeriodsPerHour() {
+    ProfilePeriod period = new ProfilePeriod(AUG2016, 240);
+
+    assertEquals(2016, period.getYear());
+    assertEquals(238, period.getDayOfYear());
+    assertEquals(13, period.getHour());
+    assertEquals(108, period.getPeriod());
+  }
+
+
+  @Test
+  public void testNextWith2PeriodsPerHour() {
+    int periodsPerHour = 2;
+
+    ProfilePeriod first = new ProfilePeriod(AUG2016, periodsPerHour);
+    assertEquals(2016, first.getYear());
+    assertEquals(238, first.getDayOfYear());
+    assertEquals(13, first.getHour());
+    assertEquals(0, first.getPeriod());
+
+    // find the next period
+    ProfilePeriod second = first.next();
+    assertEquals(2016, second.getYear());
+    assertEquals(238, second.getDayOfYear());
+    assertEquals(13, second.getHour());
+    assertEquals(1, second.getPeriod());
+    assertEquals(periodsPerHour, second.getPeriodsPerHour());
+
+    // find the next period
+    ProfilePeriod third = second.next();
+    assertEquals(2016, third.getYear());
+    assertEquals(238, third.getDayOfYear());
+    assertEquals(14, third.getHour());
+    assertEquals(0, third.getPeriod());
+    assertEquals(periodsPerHour, third.getPeriodsPerHour());
+
+    // find the next period
+    ProfilePeriod fourth = third.next();
+    assertEquals(2016, fourth.getYear());
+    assertEquals(238, fourth.getDayOfYear());
+    assertEquals(14, fourth.getHour());
+    assertEquals(1, fourth.getPeriod());
+    assertEquals(periodsPerHour, fourth.getPeriodsPerHour());
+
+    // find the next period
+    ProfilePeriod fifth = fourth.next();
+    assertEquals(2016, fifth.getYear());
+    assertEquals(238, fifth.getDayOfYear());
+    assertEquals(15, fifth.getHour());
+    assertEquals(0, fifth.getPeriod());
+    assertEquals(periodsPerHour, fifth.getPeriodsPerHour());
+
+    // find the next period
+    ProfilePeriod sixth = fifth.next();
+    assertEquals(2016, sixth.getYear());
+    assertEquals(238, sixth.getDayOfYear());
+    assertEquals(15, sixth.getHour());
+    assertEquals(1, sixth.getPeriod());
+    assertEquals(periodsPerHour, sixth.getPeriodsPerHour());
+
+    // find the next period
+    ProfilePeriod seventh = sixth.next();
+    assertEquals(2016, seventh.getYear());
+    assertEquals(238, seventh.getDayOfYear());
+    assertEquals(16, seventh.getHour());
+    assertEquals(0, seventh.getPeriod());
+    assertEquals(periodsPerHour, seventh.getPeriodsPerHour());
+  }
+
+  @Test
+  public void testNextWith4PeriodsPerHour() {
+    int periodsPerHour = 4;
+
+    ProfilePeriod first = new ProfilePeriod(AUG2016, periodsPerHour);
+    assertEquals(2016, first.getYear());
+    assertEquals(238, first.getDayOfYear());
+    assertEquals(13, first.getHour());
+    assertEquals(1, first.getPeriod());
+
+    // find the next period
+    ProfilePeriod second = first.next();
+    assertEquals(2016, second.getYear());
+    assertEquals(238, second.getDayOfYear());
+    assertEquals(13, second.getHour());
+    assertEquals(2, second.getPeriod());
+    assertEquals(periodsPerHour, second.getPeriodsPerHour());
+
+    // find the next period
+    ProfilePeriod third = second.next();
+    assertEquals(2016, third.getYear());
+    assertEquals(238, third.getDayOfYear());
+    assertEquals(13, third.getHour());
+    assertEquals(3, third.getPeriod());
+    assertEquals(periodsPerHour, third.getPeriodsPerHour());
+
+    // find the next period
+    ProfilePeriod fourth = third.next();
+    assertEquals(2016, fourth.getYear());
+    assertEquals(238, fourth.getDayOfYear());
+    assertEquals(14, fourth.getHour());
+    assertEquals(0, fourth.getPeriod());
+    assertEquals(periodsPerHour, fourth.getPeriodsPerHour());
+  }
+
+  @Test
+  public void testNextWith10PeriodsPerHour() {
+    int periodsPerHour = 10;
+
+    ProfilePeriod first = new ProfilePeriod(AUG2016, periodsPerHour);
+    assertEquals(2016, first.getYear());
+    assertEquals(238, first.getDayOfYear());
+    assertEquals(13, first.getHour());
+    assertEquals(4, first.getPeriod());
+
+    // find the next period
+    ProfilePeriod second = first.next();
+    assertEquals(2016, second.getYear());
+    assertEquals(238, second.getDayOfYear());
+    assertEquals(13, second.getHour());
+    assertEquals(5, second.getPeriod());
+    assertEquals(periodsPerHour, second.getPeriodsPerHour());
+
+    // find the next period
+    ProfilePeriod third = second.next();
+    assertEquals(2016, third.getYear());
+    assertEquals(238, third.getDayOfYear());
+    assertEquals(13, third.getHour());
+    assertEquals(6, third.getPeriod());
+    assertEquals(periodsPerHour, third.getPeriodsPerHour());
+
+    // find the next period
+    ProfilePeriod fourth = third.next();
+    assertEquals(2016, fourth.getYear());
+    assertEquals(238, fourth.getDayOfYear());
+    assertEquals(13, fourth.getHour());
+    assertEquals(7, fourth.getPeriod());
+    assertEquals(periodsPerHour, fourth.getPeriodsPerHour());
+
+    // find the next period
+    ProfilePeriod fifth = fourth.next();
+    assertEquals(2016, fifth.getYear());
+    assertEquals(238, fifth.getDayOfYear());
+    assertEquals(13, fifth.getHour());
+    assertEquals(8, fifth.getPeriod());
+    assertEquals(periodsPerHour, fifth.getPeriodsPerHour());
+
+    // find the next period
+    ProfilePeriod sixth = fifth.next();
+    assertEquals(2016, sixth.getYear());
+    assertEquals(238, sixth.getDayOfYear());
+    assertEquals(13, sixth.getHour());
+    assertEquals(9, sixth.getPeriod());
+    assertEquals(periodsPerHour, sixth.getPeriodsPerHour());
+
+    // find the next period
+    ProfilePeriod seventh = sixth.next();
+    assertEquals(2016, seventh.getYear());
+    assertEquals(238, seventh.getDayOfYear());
+    assertEquals(14, seventh.getHour());
+    assertEquals(0, seventh.getPeriod());
+    assertEquals(periodsPerHour, seventh.getPeriodsPerHour());
+  }
+
+  @Test
+  public void testNextWith240PeriodsPerHour() {
+    final int periodsPerHour = 240;
+
+    ProfilePeriod p = new ProfilePeriod(AUG2016, periodsPerHour);
+    assertEquals(2016, p.getYear());
+    assertEquals(238, p.getDayOfYear());
+    assertEquals(13, p.getHour());
+    assertEquals(108, p.getPeriod());
+
+    int lastPeriod = p.getPeriod();
+    for(int i=0; i<(periodsPerHour - 108); i++) {
+      p = p.next();
+
+      // validate the next period
+      assertEquals(2016, p.getYear());
+      assertEquals(238, p.getDayOfYear());
+      assertEquals(periodsPerHour, p.getPeriodsPerHour());
+
+      int nextPeriod = lastPeriod + 1;
+      boolean rolloverToNextHour = nextPeriod >= periodsPerHour;
+      if(!rolloverToNextHour) {
+        // still within the same hour
+        assertEquals(13, p.getHour());
+        assertEquals(nextPeriod, p.getPeriod());
+
+      } else {
+        // rollover to next hour
+        assertEquals(14, p.getHour());
+        assertEquals(0, p.getPeriod());
+        break;
+      }
+
+      lastPeriod = p.getPeriod();
+    }
+  }
+
+  /**
+   * With 2 periods per hour, 'Thu, Aug 25 2016 13:27:10 GMT' falls within the 1st period.
+   * Period starts at 'Thu, Aug 25 2016 13:00:00 000 GMT' ~ 1472130000000L
+   */
+  @Test
+  public void testTimeInMillisWith2PeriodsPerHour() {
+    final ProfilePeriod period = new ProfilePeriod(AUG2016, 2);
+    assertEquals(1472130000000L, period.getTimeInMillis());
+  }
+
+  /**
+   * With 4 periods per hour, 'Thu, Aug 25 2016 13:27:10 GMT' falls within the 2nd period.
+   * Period starts at 'Thu, Aug 25 2016 13:15:00 000 GMT' ~ 1472130900000L
+   */
+  @Test
+  public void testTimeInMillisWith4PeriodsPerHour() {
+    final ProfilePeriod period = new ProfilePeriod(AUG2016, 4);
+    assertEquals(1472130900000L, period.getTimeInMillis());
+  }
+
+  /**
+   * With 60 periods per hour, 'Thu, Aug 25 2016 13:27:10 GMT' falls within the 27th period.
+   * Period starts at 'Thu, Aug 25 2016 13:27:00 000 GMT' ~ 1472131620000L
+   */
+  @Test
+  public void testTimeInMillisWith60PeriodsPerHour() {
+    final ProfilePeriod period = new ProfilePeriod(AUG2016, 60);
+    assertEquals(1472131620000L, period.getTimeInMillis());
+  }
+
+  /**
+   * With 240 periods per hour, 'Thu, Aug 25 2016 13:27:10 GMT' falls within the 108th period.
+   * Period starts at 'Thu, Aug 25 2016 13:27:00 000 GMT' ~ 1472131620000L
+   */
+  @Test
+  public void testTimeInMillisWith240PeriodsPerHour() {
+    final ProfilePeriod period = new ProfilePeriod(AUG2016, 240);
+    assertEquals(1472131620000L, period.getTimeInMillis());
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 5390bd5..df8b335 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -164,6 +164,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
     bolt.setCuratorFramework(client);
     bolt.setTreeCache(cache);
     bolt.setExecutor(new DefaultStellarExecutor());
+    bolt.setPeriodsPerHour(4);
 
     bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
     return bolt;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
index e96f2f5..1fb25a7 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
@@ -22,18 +22,22 @@ package org.apache.metron.profiler.bolt;
 
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
 import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.List;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+
 /**
  * Tests the ProfileHBaseMapper class.
  */
@@ -43,21 +47,20 @@ public class ProfileHBaseMapperTest {
   ProfileHBaseMapper mapper;
   ProfileMeasurement measurement;
   DefaultStellarExecutor executor;
+  RowKeyBuilder rowKeyBuilder;
 
   @Before
   public void setup() {
     executor = new DefaultStellarExecutor();
 
+    rowKeyBuilder = mock(RowKeyBuilder.class);
+
     mapper = new ProfileHBaseMapper();
     mapper.setExecutor(executor);
-    mapper.setSaltDivisor(0);
+    mapper.setRowKeyBuilder(rowKeyBuilder);
 
-    measurement = new ProfileMeasurement();
-    measurement.setProfileName("profile");
-    measurement.setEntity("entity");
+    measurement = new ProfileMeasurement("profile", "entity", 20000, 4);
     measurement.setValue(22);
-    measurement.setStart(20000);
-    measurement.setEnd(50000);
 
     // the tuple will contain the original message
     tuple = mock(Tuple.class);
@@ -65,175 +68,47 @@ public class ProfileHBaseMapperTest {
   }
 
   /**
-   * There is a single group in the 'groupBy' expression that simply returns the string "group1".
+   * The mapper should execute the 'groupBy' Stellar expressions and use that to generate
+   * a row key.
    */
   @Test
-  public void testRowKeyWithOneGroupBy() throws Exception {
-    // setup
-    measurement.setGroupBy(Arrays.asList("'group1'"));
-
-    // the expected row key
-    ByteBuffer buffer = ByteBuffer
-            .allocate(100)
-            .put(measurement.getProfileName().getBytes())
-            .put(measurement.getEntity().getBytes())
-            .put("group1".getBytes())
-            .putLong(measurement.getStart());
-    buffer.flip();
-    final byte[] expected = new byte[buffer.limit()];
-    buffer.get(expected, 0, buffer.limit());
-
-    // validate
-    byte[] actual = mapper.rowKey(tuple);
-    Assert.assertTrue(Arrays.equals(expected, actual));
-  }
+  public void testExecuteGroupBy() throws Exception {
 
-  /**
-   * The user can define multiple 'groupBy' expressions.
-   */
-  @Test
-  public void testRowKeyWithTwoGroupBy() throws Exception {
-    // setup
-    measurement.setGroupBy(Arrays.asList("'group1'", "'group2'"));
-
-    // the expected row key
-    ByteBuffer buffer = ByteBuffer
-            .allocate(100)
-            .put(measurement.getProfileName().getBytes())
-            .put(measurement.getEntity().getBytes())
-            .put("group1".getBytes())
-            .put("group2".getBytes())
-            .putLong(measurement.getStart());
-    buffer.flip();
-    final byte[] expected = new byte[buffer.limit()];
-    buffer.get(expected, 0, buffer.limit());
+    // setup - expression that refers to the ProfileMeasurement.end
+    measurement.setGroupBy(Arrays.asList("2 + 2"));
 
-    // validate
-    byte[] actual = mapper.rowKey(tuple);
-    Assert.assertTrue(Arrays.equals(expected, actual));
-  }
+    // execute
+    mapper.rowKey(tuple);
 
-  /**
-   * A 'groupBy' expression can return any type; not just Strings.
-   */
-  @Test
-  public void testRowKeyWithOneIntegerGroupBy() throws Exception {
-    // setup
-    measurement.setGroupBy(Arrays.asList("200"));
-
-    // the expected row key
-    ByteBuffer buffer = ByteBuffer
-            .allocate(100)
-            .put(measurement.getProfileName().getBytes())
-            .put(measurement.getEntity().getBytes())
-            .put(Integer.valueOf(200).toString().getBytes())
-            .putLong(measurement.getStart());
-    buffer.flip();
-    final byte[] expected = new byte[buffer.limit()];
-    buffer.get(expected, 0, buffer.limit());
+    // capture the ProfileMeasurement that should be emitted
+    ArgumentCaptor<List> arg = ArgumentCaptor.forClass(List.class);
+    verify(rowKeyBuilder).rowKey(any(), arg.capture());
 
     // validate
-    byte[] actual = mapper.rowKey(tuple);
-    Assert.assertTrue(Arrays.equals(expected, actual));
+    List<Object> actual = arg.getValue();
+    Assert.assertEquals(4.0, actual.get(0));
   }
 
   /**
-   * A user does not have to define a 'groupBy'.  It is an optional field.
+   * The mapper should execute each 'groupBy' Stellar expression and use that to generate
+   * a row key.  There can be multiple.
    */
   @Test
-  public void testRowKeyWithNoGroupBy() throws Exception {
-    // setup
-    measurement.setGroupBy(Collections.emptyList());
-
-    // the expected row key
-    ByteBuffer buffer = ByteBuffer
-            .allocate(100)
-            .put(measurement.getProfileName().getBytes())
-            .put(measurement.getEntity().getBytes())
-            .putLong(measurement.getStart());
-    buffer.flip();
-    final byte[] expected = new byte[buffer.limit()];
-    buffer.get(expected, 0, buffer.limit());
+  public void testExecuteMultipleGroupBys() throws Exception {
 
-    // validate
-    byte[] actual = mapper.rowKey(tuple);
-    Assert.assertTrue(Arrays.equals(expected, actual));
-  }
+    // setup - expression that refers to the ProfileMeasurement.end
+    measurement.setGroupBy(Arrays.asList("2 + 2", "4 + 4"));
 
-  /**
-   * A user does not have to define a 'groupBy'.  It is an optional field.
-   */
-  @Test
-  public void testRowKeyWithNullGroupBy() throws Exception {
-    // setup
-    measurement.setGroupBy(null);
-
-    // the expected row key
-    ByteBuffer buffer = ByteBuffer
-            .allocate(100)
-            .put(measurement.getProfileName().getBytes())
-            .put(measurement.getEntity().getBytes())
-            .putLong(measurement.getStart());
-    buffer.flip();
-    final byte[] expected = new byte[buffer.limit()];
-    buffer.get(expected, 0, buffer.limit());
-
-    // validate
-    byte[] actual = mapper.rowKey(tuple);
-    Assert.assertTrue(Arrays.equals(expected, actual));
-  }
+    // execute
+    mapper.rowKey(tuple);
 
-  /**
-   * A 'groupBy' expression can refer to the fields within the ProfileMeasurement.  The
-   * most important fields likely being the starting and ending timestamp.  With these fields
-   * any calendar based group can be defined.  For example, the day of week, week of month, etc
-   * can all be calculated from these vaulues.
-   */
-  @Test
-  public void testRowKeyWithGroupByUsingMeasurementField() throws Exception {
-
-    // setup - the group expression refers to the 'end' timestamp contained within the ProfileMeasurement
-    measurement.setGroupBy(Arrays.asList("end"));
-
-    // the expected row key
-    ByteBuffer buffer = ByteBuffer
-            .allocate(100)
-            .put(measurement.getProfileName().getBytes())
-            .put(measurement.getEntity().getBytes())
-            .put(Long.valueOf(measurement.getEnd()).toString().getBytes())
-            .putLong(measurement.getStart());
-    buffer.flip();
-    final byte[] expected = new byte[buffer.limit()];
-    buffer.get(expected, 0, buffer.limit());
-
-    // validate
-    byte[] actual = mapper.rowKey(tuple);
-    Assert.assertTrue(Arrays.equals(expected, actual));
-  }
-
-  /**
-   * If the saltDivisor > 0, then the row key should be prepended with a salt.  This
-   * can be used to prevent hotspotting.
-   */
-  @Test
-  public void testRowKeyWithSalt() throws Exception {
-    // setup
-    mapper.setSaltDivisor(100);
-    measurement.setGroupBy(Collections.emptyList());
-
-    // the expected row key
-    ByteBuffer buffer = ByteBuffer
-            .allocate(100)
-            .put(ProfileHBaseMapper.getSalt(measurement.getStart(), mapper.getSaltDivisor()))
-            .put(measurement.getProfileName().getBytes())
-            .put(measurement.getEntity().getBytes())
-            .putLong(measurement.getStart());
-    buffer.flip();
-    final byte[] expected = new byte[buffer.limit()];
-    buffer.get(expected, 0, buffer.limit());
+    // capture the ProfileMeasurement that should be emitted
+    ArgumentCaptor<List> arg = ArgumentCaptor.forClass(List.class);
+    verify(rowKeyBuilder).rowKey(any(), arg.capture());
 
     // validate
-    byte[] actual = mapper.rowKey(tuple);
-    Assert.assertTrue(Arrays.equals(expected, actual));
+    List<Object> actual = arg.getValue();
+    Assert.assertEquals(4.0, actual.get(0));
+    Assert.assertEquals(8.0, actual.get(1));
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
new file mode 100644
index 0000000..dce7757
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
@@ -0,0 +1,275 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.hbase;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.ProfilePeriod;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Formatter;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the SaltyRowKeyBuilder.
+ */
+public class SaltyRowKeyBuilderTest {
+
+  private static final int saltDivisor = 1000;
+  private static final int periodsPerHour = 4;
+
+  private SaltyRowKeyBuilder rowKeyBuilder;
+  private ProfileMeasurement measurement;
+  private Tuple tuple;
+
+  /**
+   * Thu, Aug 25 2016 09:27:10 EST
+   * Thu, Aug 25 2016 13:27:10 GMT
+   *
+   * 238th day of the year
+   */
+  private long AUG2016 = 1472131630748L;
+
+  @Before
+  public void setup() throws Exception {
+
+    // a profile measurement
+    measurement = new ProfileMeasurement("profile", "entity", AUG2016, periodsPerHour);
+    measurement.setValue(22);
+
+    // the tuple will contain the original message
+    tuple = mock(Tuple.class);
+    when(tuple.getValueByField(eq("measurement"))).thenReturn(measurement);
+  }
+
+  /**
+   * Build a row key that includes only one group.
+   */
+  @Test
+  public void testRowKeyWithOneGroup() throws Exception {
+    // setup
+    rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodsPerHour);
+    List<Object> groups = Arrays.asList("group1");
+
+    // the expected row key
+    ByteBuffer buffer = ByteBuffer
+            .allocate(100)
+            .put(SaltyRowKeyBuilder.getSalt(measurement.getPeriod(), saltDivisor))
+            .put(measurement.getProfileName().getBytes())
+            .put(measurement.getEntity().getBytes())
+            .put("group1".getBytes())
+            .putInt(2016)
+            .putInt(238)
+            .putInt(13)
+            .putInt(1);
+
+    buffer.flip();
+    final byte[] expected = new byte[buffer.limit()];
+    buffer.get(expected, 0, buffer.limit());
+
+    // validate
+    byte[] actual = rowKeyBuilder.rowKey(measurement, groups);
+    Assert.assertTrue(Arrays.equals(expected, actual));
+  }
+
+  /**
+   * Build a row key that includes two groups.
+   */
+  @Test
+  public void testRowKeyWithTwoGroups() throws Exception {
+    // setup
+    rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodsPerHour);
+    List<Object> groups = Arrays.asList("group1","group2");
+
+    // the expected row key
+    ByteBuffer buffer = ByteBuffer
+            .allocate(100)
+            .put(SaltyRowKeyBuilder.getSalt(measurement.getPeriod(), saltDivisor))
+            .put(measurement.getProfileName().getBytes())
+            .put(measurement.getEntity().getBytes())
+            .put("group1".getBytes())
+            .put("group2".getBytes())
+            .putInt(2016)
+            .putInt(238)
+            .putInt(13)
+            .putInt(1);
+
+    buffer.flip();
+    final byte[] expected = new byte[buffer.limit()];
+    buffer.get(expected, 0, buffer.limit());
+
+    // validate
+    byte[] actual = rowKeyBuilder.rowKey(measurement, groups);
+    Assert.assertTrue(Arrays.equals(expected, actual));
+  }
+
+  /**
+   * Build a row key that includes a single group that is an integer.
+   */
+  @Test
+  public void testRowKeyWithOneIntegerGroup() throws Exception {
+    // setup
+    rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodsPerHour);
+    List<Object> groups = Arrays.asList(200);
+
+    // the expected row key
+    ByteBuffer buffer = ByteBuffer
+            .allocate(100)
+            .put(SaltyRowKeyBuilder.getSalt(measurement.getPeriod(), saltDivisor))
+            .put(measurement.getProfileName().getBytes())
+            .put(measurement.getEntity().getBytes())
+            .put("200".getBytes())
+            .putInt(2016)
+            .putInt(238)
+            .putInt(13)
+            .putInt(1);
+
+    buffer.flip();
+    final byte[] expected = new byte[buffer.limit()];
+    buffer.get(expected, 0, buffer.limit());
+
+    // validate
+    byte[] actual = rowKeyBuilder.rowKey(measurement, groups);
+    Assert.assertTrue(Arrays.equals(expected, actual));
+  }
+
+  /**
+   * Build a row key that includes a single group that is an integer.
+   */
+  @Test
+  public void testRowKeyWithMixedGroups() throws Exception {
+    // setup
+    rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodsPerHour);
+    List<Object> groups = Arrays.asList(200, "group1");
+
+    // the expected row key
+    ByteBuffer buffer = ByteBuffer
+            .allocate(100)
+            .put(SaltyRowKeyBuilder.getSalt(measurement.getPeriod(), saltDivisor))
+            .put(measurement.getProfileName().getBytes())
+            .put(measurement.getEntity().getBytes())
+            .put("200".getBytes())
+            .put("group1".getBytes())
+            .putInt(2016)
+            .putInt(238)
+            .putInt(13)
+            .putInt(1);
+
+    buffer.flip();
+    final byte[] expected = new byte[buffer.limit()];
+    buffer.get(expected, 0, buffer.limit());
+
+    // validate
+    byte[] actual = rowKeyBuilder.rowKey(measurement, groups);
+    Assert.assertTrue(Arrays.equals(expected, actual));
+  }
+
+  /**
+   * Build a row key that does not include any groups.
+   */
+  @Test
+  public void testRowKeyWithNoGroup() throws Exception {
+    // setup
+    rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodsPerHour);
+    List<Object> groups = Collections.emptyList();
+
+    // the expected row key
+    ByteBuffer buffer = ByteBuffer
+            .allocate(100)
+            .put(SaltyRowKeyBuilder.getSalt(measurement.getPeriod(), saltDivisor))
+            .put(measurement.getProfileName().getBytes())
+            .put(measurement.getEntity().getBytes())
+            .putInt(2016)
+            .putInt(238)
+            .putInt(13)
+            .putInt(1);
+
+    buffer.flip();
+    final byte[] expected = new byte[buffer.limit()];
+    buffer.get(expected, 0, buffer.limit());
+
+    // validate
+    byte[] actual = rowKeyBuilder.rowKey(measurement, groups);
+    Assert.assertTrue(Arrays.equals(expected, actual));
+  }
+
+  /**
+   * `rowKeys` should return all of the row keys needed to retrieve the profile values over a given time horizon.
+   */
+  @Test
+  public void testRowKeys() throws Exception {
+    int hoursAgo = 1;
+
+    // setup
+    List<Object> groups = Collections.emptyList();
+    rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodsPerHour);
+
+    // a dummy profile measurement
+    long oldest = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hoursAgo);
+    ProfileMeasurement m = new ProfileMeasurement("profile", "entity", oldest, periodsPerHour);
+    m.setValue(22);
+
+    // generate a list of expected keys
+    List<byte[]> expectedKeys = new ArrayList<>();
+    for  (int i=0; i<(hoursAgo * periodsPerHour)+1; i++) {
+
+      // generate the expected key
+      byte[] rk = rowKeyBuilder.rowKey(m, groups);
+      expectedKeys.add(rk);
+
+      // advance to the next period
+      ProfilePeriod next = m.getPeriod().next();
+      m = new ProfileMeasurement("profile", "entity", next.getTimeInMillis(), periodsPerHour);
+    }
+
+    // execute
+    List<byte[]> actualKeys = rowKeyBuilder.rowKeys(measurement.getProfileName(), measurement.getEntity(), groups, hoursAgo, TimeUnit.HOURS);
+
+    // validate - expectedKeys == actualKeys
+    for(int i=0; i<actualKeys.size(); i++) {
+      byte[] actual = actualKeys.get(i);
+      byte[] expected = expectedKeys.get(i);
+      assertThat(actual, equalTo(expected));
+    }
+  }
+
+  private void printBytes(byte[] bytes) {
+    StringBuilder sb = new StringBuilder(bytes.length * 2);
+    Formatter formatter = new Formatter(sb);
+    for (byte b : bytes) {
+      formatter.format("%02x ", b);
+    }
+    System.out.println(sb.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SerializerTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SerializerTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SerializerTest.java
new file mode 100644
index 0000000..69de4ba
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SerializerTest.java
@@ -0,0 +1,71 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.hbase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the Serializer.
+ */
+public class SerializerTest {
+
+  @Test
+  public void testInteger() {
+    final int expected = 2;
+    byte[] raw = Serializer.toBytes(expected);
+    int actual = Serializer.fromBytes(raw, Integer.class);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testDouble() {
+    final double expected = 2.0;
+    byte[] raw = Serializer.toBytes(expected);
+    double actual = Serializer.fromBytes(raw, Double.class);
+    assertEquals(expected, actual, 0.01);
+  }
+
+  @Test
+  public void testShort() {
+    final short expected = 2;
+    byte[] raw = Serializer.toBytes(expected);
+    short actual = Serializer.fromBytes(raw, Short.class);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testLong() {
+    final long expected = 2L;
+    byte[] raw = Serializer.toBytes(expected);
+    long actual = Serializer.fromBytes(raw, Long.class);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testFloat() {
+    final Float expected = 2.2F;
+    byte[] raw = Serializer.toBytes(expected);
+    float actual = Serializer.fromBytes(raw, Float.class);
+    assertEquals(expected, actual, 0.01);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index 66c1308..03e25a4 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -33,7 +33,8 @@ import org.apache.metron.integration.BaseIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.components.FluxTopologyComponent;
 import org.apache.metron.integration.components.KafkaWithZKComponent;
-import org.apache.metron.profiler.bolt.ProfileHBaseMapper;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
 import org.apache.metron.test.mock.MockHTable;
 import org.junit.After;
 import org.junit.Assert;
@@ -94,6 +95,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   @Multiline
   private String message3;
 
+  private ColumnBuilder columnBuilder;
   private FluxTopologyComponent fluxComponent;
   private KafkaWithZKComponent kafkaComponent;
   private List<byte[]> input;
@@ -133,7 +135,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
             timeout(seconds(90)));
 
     // verify - there are 5 'HTTP' each with 390 bytes
-    double actual = readDouble(ProfileHBaseMapper.QVALUE);
+    double actual = readDouble(columnBuilder.getColumnQualifier("value"));
     Assert.assertEquals(390.0 * 5, actual, 0.01);
   }
 
@@ -154,7 +156,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
             timeout(seconds(90)));
 
     // verify - there are 5 'HTTP' and 5 'DNS' messages thus 5/5 = 1
-    double actual = readDouble(ProfileHBaseMapper.QVALUE);
+    double actual = readDouble(columnBuilder.getColumnQualifier("value"));
     Assert.assertEquals(5.0 / 5.0, actual, 0.01);
   }
 
@@ -175,7 +177,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
             timeout(seconds(90)));
 
     // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
-    double actual = readDouble(ProfileHBaseMapper.QVALUE);
+    double actual = readDouble(columnBuilder.getColumnQualifier("value"));
     Assert.assertEquals(20.0, actual, 0.01);
   }
 
@@ -193,7 +195,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
             timeout(seconds(90)));
 
     // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
-    double actual = readInteger(ProfileHBaseMapper.QVALUE);
+    double actual = readInteger(columnBuilder.getColumnQualifier("value"));
     Assert.assertEquals(10.0, actual, 0.01);
   }
 
@@ -211,7 +213,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
             timeout(seconds(90)));
 
     // verify - the 70th percentile of 5 x 20s = 20.0
-    double actual = readDouble(ProfileHBaseMapper.QVALUE);
+    double actual = readDouble(columnBuilder.getColumnQualifier("value"));
     Assert.assertEquals(20.0, actual, 0.01);
   }
 
@@ -224,7 +226,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     ResultScanner scanner = profilerTable.getScanner(cf, columnQual);
 
     for (Result result : scanner) {
-      byte[] raw = result.getValue(cf, ProfileHBaseMapper.QVALUE);
+      byte[] raw = result.getValue(cf, columnQual);
       return Bytes.toDouble(raw);
     }
 
@@ -240,7 +242,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     ResultScanner scanner = profilerTable.getScanner(cf, columnQual);
 
     for (Result result : scanner) {
-      byte[] raw = result.getValue(cf, ProfileHBaseMapper.QVALUE);
+      byte[] raw = result.getValue(cf, columnQual);
       return Bytes.toInt(raw);
     }
 
@@ -248,6 +250,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   }
 
   public void setup(String pathToConfig) throws Exception {
+    columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
 
     // create input messages for the profiler to consume
     input = Stream.of(message1, message2, message3)
@@ -262,9 +265,10 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
       setProperty("profiler.workers", "1");
       setProperty("profiler.executors", "0");
       setProperty("profiler.input.topic", Constants.INDEXING_TOPIC);
-      setProperty("profiler.flush.interval.seconds", "15");
+      setProperty("profiler.periods.per.hour", "240");
       setProperty("profiler.hbase.salt.divisor", "10");
       setProperty("profiler.hbase.table", tableName);
+      setProperty("profiler.hbase.column.family", columnFamily);
       setProperty("profiler.hbase.batch", "10");
       setProperty("profiler.hbase.flush.interval.seconds", "1");
       setProperty("hbase.provider.impl", "" + MockTableProvider.class.getName());
@@ -307,4 +311,4 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
       runner.stop();
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
new file mode 100644
index 0000000..7a5cbcf
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
@@ -0,0 +1,161 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.stellar;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.dsl.Context;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests the DefaultStellarExecutor.
+ */
+public class DefaultStellarExecutorTest {
+
+  /**
+   * {
+   *   "ip_src_addr": "10.0.0.1",
+   *   "ip_dst_addr": "10.0.0.20"
+   * }
+   */
+  @Multiline
+  private String input;
+
+  private JSONObject message;
+  private DefaultStellarExecutor executor;
+
+  @Before
+  public void setup() throws ParseException {
+
+    // parse the input message
+    JSONParser parser = new JSONParser();
+    message = (JSONObject) parser.parse(input);
+
+    // create the executor to test
+    executor = new DefaultStellarExecutor();
+    executor.setContext(Context.EMPTY_CONTEXT());
+  }
+
+  /**
+   * Ensure that a value can be assigned to a variable.
+   */
+  @Test
+  public void testAssign() {
+    executor.assign("foo", "2", message);
+
+    // verify
+    Object var = executor.getState().get("foo");
+    assertThat(var, instanceOf(Integer.class));
+    assertThat(var, equalTo(2));
+  }
+
+  /**
+   * Ensure that a variable can be resolved from a message field.
+   */
+  @Test
+  public void testAssignWithVariableResolution() {
+    executor.assign("foo", "ip_src_addr", message);
+
+    // verify
+    Object var = executor.getState().get("foo");
+    assertThat(var, instanceOf(String.class));
+    assertThat(var, equalTo("10.0.0.1"));
+  }
+
+  /**
+   * Ensure that state is maintained correctly in the execution environment.
+   */
+  @Test
+  public void testState() {
+    executor.assign("two", "2", message);
+    executor.assign("four", "4", message);
+    executor.assign("sum", "two + four", message);
+
+    // verify
+    Object var = executor.getState().get("sum");
+    assertEquals(6.0, var);
+  }
+
+  /**
+   * Ensure that state is maintained correctly in the execution environment.
+   */
+  @Test
+  public void testClearState() {
+    executor.assign("two", "2", message);
+    executor.clearState();
+
+    // verify
+    assertThat(executor.getState().containsKey("two"), equalTo(false));
+  }
+
+  /**
+   * Ensure that a Transformation function can be executed.
+   *
+   * There are two sets of functions in Stellar currently.  One can be executed with
+   * a PredicateProcessor and the other a TransformationProcessor.  The StellarExecutor
+   * abstracts away that complication.
+   */
+  @Test
+  public void testExecuteTransformation() {
+    String actual = executor.execute("TO_UPPER('lowercase')", message, String.class);
+    assertThat(actual, equalTo("LOWERCASE"));
+  }
+
+  /**
+   * Ensure that a Predicate function can be executed.
+   *
+   * There are two sets of functions in Stellar currently.  One can be executed with
+   * a PredicateProcessor and the other a TransformationProcessor.  The StellarExecutor
+   * abstracts away that complication.
+   */
+  @Test
+  public void testExecutePredicate() {
+    boolean actual = executor.execute("IS_INTEGER(2)", message, Boolean.class);
+    assertThat(actual, equalTo(true));
+  }
+
+  /**
+   * An exception is expected if an expression results in an unexpected type.
+   */
+  @Test(expected = RuntimeException.class)
+  public void testExecuteWithWrongType() {
+    executor.execute("2 + 2", message, Boolean.class);
+  }
+
+  /**
+   * A best effort should be made to do sensible type conversions.
+   */
+  @Test
+  public void testExecuteWithTypeConversion() {
+    executor.execute("2", message, Double.class);
+    executor.execute("2", message, Float.class);
+    executor.execute("2", message, Short.class);
+    executor.execute("2", message, Long.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
deleted file mode 100644
index c11d94a..0000000
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.util;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.dsl.Context;
-import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests the DefaultStellarExecutor.
- */
-public class DefaultStellarExecutorTest {
-
-  /**
-   * {
-   *   "ip_src_addr": "10.0.0.1",
-   *   "ip_dst_addr": "10.0.0.20"
-   * }
-   */
-  @Multiline
-  private String input;
-
-  private JSONObject message;
-  private DefaultStellarExecutor executor;
-
-  @Before
-  public void setup() throws ParseException {
-
-    // parse the input message
-    JSONParser parser = new JSONParser();
-    message = (JSONObject) parser.parse(input);
-
-    // create the executor to test
-    executor = new DefaultStellarExecutor();
-    executor.setContext(Context.EMPTY_CONTEXT());
-  }
-
-  /**
-   * Ensure that a value can be assigned to a variable.
-   */
-  @Test
-  public void testAssign() {
-    executor.assign("foo", "2", message);
-
-    // verify
-    Object var = executor.getState().get("foo");
-    assertThat(var, instanceOf(Integer.class));
-    assertThat(var, equalTo(2));
-  }
-
-  /**
-   * Ensure that a variable can be resolved from a message field.
-   */
-  @Test
-  public void testAssignWithVariableResolution() {
-    executor.assign("foo", "ip_src_addr", message);
-
-    // verify
-    Object var = executor.getState().get("foo");
-    assertThat(var, instanceOf(String.class));
-    assertThat(var, equalTo("10.0.0.1"));
-  }
-
-  /**
-   * Ensure that state is maintained correctly in the execution environment.
-   */
-  @Test
-  public void testState() {
-    executor.assign("two", "2", message);
-    executor.assign("four", "4", message);
-    executor.assign("sum", "two + four", message);
-
-    // verify
-    Object var = executor.getState().get("sum");
-    assertEquals(6.0, var);
-  }
-
-  /**
-   * Ensure that state is maintained correctly in the execution environment.
-   */
-  @Test
-  public void testClearState() {
-    executor.assign("two", "2", message);
-    executor.clearState();
-
-    // verify
-    assertThat(executor.getState().containsKey("two"), equalTo(false));
-  }
-
-  /**
-   * Ensure that a Transformation function can be executed.
-   *
-   * There are two sets of functions in Stellar currently.  One can be executed with
-   * a PredicateProcessor and the other a TransformationProcessor.  The StellarExecutor
-   * abstracts away that complication.
-   */
-  @Test
-  public void testExecuteTransformation() {
-    String actual = executor.execute("TO_UPPER('lowercase')", message, String.class);
-    assertThat(actual, equalTo("LOWERCASE"));
-  }
-
-  /**
-   * Ensure that a Predicate function can be executed.
-   *
-   * There are two sets of functions in Stellar currently.  One can be executed with
-   * a PredicateProcessor and the other a TransformationProcessor.  The StellarExecutor
-   * abstracts away that complication.
-   */
-  @Test
-  public void testExecutePredicate() {
-    boolean actual = executor.execute("IS_INTEGER(2)", message, Boolean.class);
-    assertThat(actual, equalTo(true));
-  }
-
-  /**
-   * An exception is expected if an expression results in an unexpected type.
-   */
-  @Test(expected = RuntimeException.class)
-  public void testExecuteWithWrongType() {
-    executor.execute("2 + 2", message, Boolean.class);
-  }
-
-  /**
-   * A best effort should be made to do sensible type conversions.
-   */
-  @Test
-  public void testExecuteWithTypeConversion() {
-    executor.execute("2", message, Double.class);
-    executor.execute("2", message, Float.class);
-    executor.execute("2", message, Short.class);
-    executor.execute("2", message, Long.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/pom.xml b/metron-analytics/pom.xml
index ca20912..2e62be7 100644
--- a/metron-analytics/pom.xml
+++ b/metron-analytics/pom.xml
@@ -43,6 +43,7 @@
 		<module>metron-maas-service</module>
 		<module>metron-maas-common</module>
 		<module>metron-profiler</module>
+		<module>metron-profiler-client</module>
 	</modules>
 	<dependencies>
 		<dependency>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
index 9aa511c..0731b00 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Function;
 
 public class Context implements Serializable {
 
@@ -56,12 +55,12 @@ public class Context implements Serializable {
 
   public static Context EMPTY_CONTEXT() {
     return
-    new Context(new HashMap<>()){
-      @Override
-      public Optional<Object> getCapability(String capability) {
-        return Optional.empty();
-      }
-    };
+            new Context(new HashMap<>()){
+              @Override
+              public Optional<Object> getCapability(String capability) {
+                return Optional.empty();
+              }
+            };
   }
 
   private Map<String, Capability> capabilities;
@@ -89,4 +88,4 @@ public class Context implements Serializable {
   public void addCapability(Enum<?> s, Capability capability) {
     this.capabilities.put(s.toString(), capability);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
index d19633e..925c63d 100644
--- a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
+++ b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
@@ -45,9 +45,11 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests the HBaseClient
@@ -128,6 +130,10 @@ public class HBaseClientTest {
     Get get1 = client.constructGetRequests(rowKey1, criteria);
     Result[] results = client.batchGet(Arrays.asList(get1));
     Assert.assertEquals(1, results.length);
+
+    // validate
+    assertEquals(1, results.length);
+    assertEquals(widget1, toWidget(results[0]));
   }
 
   @Test
@@ -152,6 +158,24 @@ public class HBaseClientTest {
     Get get1 = client.constructGetRequests(rowKey1, criteria);
     Get get2 = client.constructGetRequests(rowKey2, criteria);
     Result[] results = client.batchGet(Arrays.asList(get1, get2));
-    Assert.assertEquals(2, results.length);
+
+    // validate
+    assertEquals(2, results.length);
+    List<Widget> expected = Arrays.asList(widget1, widget2);
+    for(Result result : results) {
+      Widget widget = toWidget(result);
+      Assert.assertThat(expected, hasItem(widget));
+    }
+  }
+
+  /**
+   * Transforms the HBase Result to a Widget.
+   * @param result The HBase Result.
+   * @return The Widget.
+   */
+  private Widget toWidget(Result result) {
+    int cost = Bytes.toInt(result.getValue(WidgetMapper.CF, WidgetMapper.QCOST));
+    String name = Bytes.toString(result.getValue(WidgetMapper.CF, WidgetMapper.QNAME));
+    return new Widget(name, cost);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
index c522f8d..723aa71 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
@@ -23,6 +23,7 @@ import com.google.protobuf.Message;
 import com.google.protobuf.Service;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
@@ -130,7 +131,7 @@ public class MockHTable implements HTableInterface {
 
   @Override
   public Configuration getConfiguration() {
-    throw new UnsupportedOperationException();
+    return HBaseConfiguration.create();
   }
 
   @Override


Mime
View raw message