metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject [2/2] incubator-metron git commit: METRON-389 Create Java API to Read Profile Data During Model Scoring (nickwallen) closes apache/incubator-metron#236
Date Mon, 12 Sep 2016 15:19:23 GMT
METRON-389 Create Java API to Read Profile Data During Model Scoring (nickwallen) closes apache/incubator-metron#236


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/3dfd7be6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/3dfd7be6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/3dfd7be6

Branch: refs/heads/master
Commit: 3dfd7be6f9b5325bdce01ed21f15d2ed13384ea3
Parents: 2e5538c
Author: nickwallen <nick@nickallen.org>
Authored: Mon Sep 12 11:11:29 2016 -0400
Committer: Nick Allen <nick@nickallen.org>
Committed: Mon Sep 12 11:11:29 2016 -0400

----------------------------------------------------------------------
 metron-analytics/metron-profiler-client/pom.xml | 291 ++++++++++++++++
 .../src/main/assembly/assembly.xml              |  37 ++
 .../profiler/client/HBaseProfilerClient.java    | 117 +++++++
 .../metron/profiler/client/ProfilerClient.java  |  45 +++
 .../client/HBaseProfilerClientTest.java         | 217 ++++++++++++
 .../src/test/resources/log4j.properties         |  28 ++
 metron-analytics/metron-profiler/README.md      |  40 +--
 metron-analytics/metron-profiler/pom.xml        | 113 +++---
 .../src/main/config/profiler.properties         |   4 +-
 .../src/main/flux/profiler/remote.yaml          |  25 +-
 .../metron/profiler/ProfileMeasurement.java     |  63 ++--
 .../apache/metron/profiler/ProfilePeriod.java   | 199 +++++++++++
 .../profiler/bolt/ProfileBuilderBolt.java       |  47 ++-
 .../profiler/bolt/ProfileHBaseMapper.java       | 210 +++---------
 .../metron/profiler/hbase/ColumnBuilder.java    |  50 +++
 .../metron/profiler/hbase/RowKeyBuilder.java    |  60 ++++
 .../profiler/hbase/SaltyRowKeyBuilder.java      | 211 ++++++++++++
 .../metron/profiler/hbase/Serializer.java       |  92 +++++
 .../profiler/hbase/ValueOnlyColumnBuilder.java  |  75 ++++
 .../metron/profiler/ProfilePeriodTest.java      | 341 +++++++++++++++++++
 .../profiler/bolt/ProfileBuilderBoltTest.java   |   1 +
 .../profiler/bolt/ProfileHBaseMapperTest.java   | 199 ++---------
 .../profiler/hbase/SaltyRowKeyBuilderTest.java  | 275 +++++++++++++++
 .../metron/profiler/hbase/SerializerTest.java   |  71 ++++
 .../integration/ProfilerIntegrationTest.java    |  24 +-
 .../stellar/DefaultStellarExecutorTest.java     | 161 +++++++++
 .../util/DefaultStellarExecutorTest.java        | 162 ---------
 metron-analytics/pom.xml                        |   1 +
 .../org/apache/metron/common/dsl/Context.java   |  15 +-
 .../metron/hbase/client/HBaseClientTest.java    |  26 +-
 .../org/apache/metron/test/mock/MockHTable.java |   3 +-
 31 files changed, 2535 insertions(+), 668 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler-client/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/pom.xml b/metron-analytics/metron-profiler-client/pom.xml
new file mode 100644
index 0000000..f27abba
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/pom.xml
@@ -0,0 +1,291 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-analytics</artifactId>
+        <version>0.2.0BETA</version>
+    </parent>
+    <artifactId>metron-profiler-client</artifactId>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-profiler</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-test-utilities</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-auth</artifactId>
+            <version>${global_hadoop_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${global_hadoop_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <classifier>tests</classifier>
+            <version>${global_hadoop_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <classifier>tests</classifier>
+            <version>${global_hadoop_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>disruptor</artifactId>
+                    <groupId>com.googlecode.disruptor</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hbase</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-testing-util</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${global_mockito_version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <reporting>
+        <plugins>
+            <!-- Normally, dependency report takes time, skip it -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-project-info-reports-plugin</artifactId>
+                <version>2.7</version>
+
+                <configuration>
+                    <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>emma-maven-plugin</artifactId>
+                <version>1.0-alpha-3</version>
+                <inherited>true</inherited>
+            </plugin>
+        </plugins>
+    </reporting>
+
+    <build>
+        <plugins>
+            <plugin>
+                <!-- Separates the unit tests from the integration tests. -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.12.4</version>
+                <configuration>
+                    <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
+                    <argLine>-Xmx2048m</argLine>
+                    <skip>true</skip>
+                    <!-- Show 100% of the lines from the stack trace (doesn't work) -->
+                    <trimStackTrace>false</trimStackTrace>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>unit-tests</id>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <!-- Never skip running the tests when the test phase is invoked -->
+                            <skip>false</skip>
+                            <includes>
+                                <!-- Include unit tests within integration-test phase. -->
+                                <include>**/*Test.java</include>
+                            </includes>
+                            <excludes>
+                                <!-- Exclude integration tests within (unit) test phase. -->
+                                <exclude>**/*IntegrationTest.java</exclude>
+                            </excludes>
+
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>integration-tests</id>
+                        <phase>integration-test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <!-- Never skip running the tests when the integration-test phase is invoked -->
+                            <skip>false</skip>
+                            <includes>
+                                <!-- Include integration tests within integration-test phase. -->
+                                <include>**/*IntegrationTest.java</include>
+                            </includes>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${global_shade_version}</version>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.lmax</pattern>
+                                    <shadedPattern>org.apache.metron.lmax.metron-profiler</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    <shadedPattern>org.apache.metron.guava.metron-profiler</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml.jackson.core</pattern>
+                                    <shadedPattern>com.fasterxml.jackson.core.metron.elasticsearch</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>storm:storm-core:*</exclude>
+                                    <exclude>storm:storm-lib:*</exclude>
+                                    <exclude>org.slf4j.impl*</exclude>
+                                    <exclude>org.slf4j:slf4j-log4j*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                                    <resource>.yaml</resource>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass></mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptor>src/main/assembly/assembly.xml</descriptor>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
+                        <phase>package</phase> <!-- bind to the packaging phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler-client/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/assembly/assembly.xml b/metron-analytics/metron-profiler-client/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..b34bd98
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/assembly/assembly.xml
@@ -0,0 +1,37 @@
+<!--
+  ~
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  ~
+  -->
+
+<assembly>
+    <id>archive</id>
+    <formats>
+        <format>tar.gz</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <fileSets>
+        <fileSet>
+            <directory>${project.basedir}/target</directory>
+            <includes>
+                <include>${project.artifactId}-${project.version}.jar</include>
+            </includes>
+            <outputDirectory>/lib</outputDirectory>
+            <useDefaultExcludes>true</useDefaultExcludes>
+        </fileSet>
+    </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
new file mode 100644
index 0000000..b4e52b5
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
@@ -0,0 +1,117 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.Serializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * The default implementation of a ProfilerClient that fetches profile data persisted in HBase.
+ */
+public class HBaseProfilerClient implements ProfilerClient {
+
+  /**
+   * Used to access the profile data stored in HBase.
+   */
+  private HTableInterface table;
+
+  /**
+   * Generates the row keys necessary to scan HBase.
+   */
+  private RowKeyBuilder rowKeyBuilder;
+
+  /**
+   * Knows how profiles are organized in HBase.
+   */
+  private ColumnBuilder columnBuilder;
+
+  public HBaseProfilerClient(HTableInterface table, RowKeyBuilder rowKeyBuilder, ColumnBuilder columnBuilder) {
+    setTable(table);
+    setRowKeyBuilder(rowKeyBuilder);
+    setColumnBuilder(columnBuilder);
+  }
+
+  /**
+   * Fetches all of the data values associated with a Profile.
+   *
+   * @param profile The name of the profile.
+   * @param entity The name of the entity.
+   * @param durationAgo How far in the past to fetch values from.
+   * @param unit The time unit of 'durationAgo'.
+   * @param groups The groups
+   * @param <T> The type of values stored by the Profile.
+   * @return A list of profile values.
+   */
+  @Override
+  public <T> List<T> fetch(String profile, String entity, long durationAgo, TimeUnit unit, Class<T> clazz, List<Object> groups) {
+
+    // find all the row keys that satisfy this fetch
+    List<byte[]> keysToFetch = rowKeyBuilder.rowKeys(profile, entity, groups, durationAgo, unit);
+    byte[] columnFamilyBytes = Bytes.toBytes(columnBuilder.getColumnFamily());
+    byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
+
+    // create a Get for each of the row keys
+    List<Get> gets = keysToFetch
+            .stream()
+            .map(k -> new Get(k).addColumn(columnFamilyBytes, columnQualifier))
+            .collect(Collectors.toList());
+
+    // submit the gets to HBase
+    try {
+      List<T> values = new ArrayList<>();
+
+      Result[] results = table.get(gets);
+      Arrays.stream(results)
+              .filter(r -> r.containsColumn(columnFamilyBytes, columnQualifier))
+              .map(r -> r.getValue(columnFamilyBytes, columnQualifier))
+              .forEach(val -> values.add(Serializer.fromBytes(val, clazz)));
+
+      return values;
+
+    } catch(IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void setTable(HTableInterface table) {
+    this.table = table;
+  }
+
+  public void setRowKeyBuilder(RowKeyBuilder rowKeyBuilder) {
+    this.rowKeyBuilder = rowKeyBuilder;
+  }
+
+  public void setColumnBuilder(ColumnBuilder columnBuilder) {
+    this.columnBuilder = columnBuilder;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
new file mode 100644
index 0000000..9cae0e9
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
@@ -0,0 +1,45 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An interface for a client capable of retrieving the profile data that has been persisted by the Profiler.
+ */
+public interface ProfilerClient {
+
+  /**
+   * Fetch the measurement values associated with a profile.
+   *
+   * @param profile The name of the profile.
+   * @param entity The name of the entity.
+   * @param durationAgo How far in the past to fetch values from.
+   * @param unit The time unit of 'durationAgo'.
+   * @param clazz The type of values stored by the profile.
+   * @param groups The groups used to sort the profile data.
+   * @param <T> The type of values stored by the Profile.
+   * @return A list of values.
+   */
+  <T> List<T> fetch(String profile, String entity, long durationAgo, TimeUnit unit, Class<T> clazz, List<Object> groups);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
new file mode 100644
index 0000000..7e13f3b
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
@@ -0,0 +1,217 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.ProfilePeriod;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
+import org.apache.metron.profiler.stellar.StellarExecutor;
+import org.apache.storm.hbase.common.ColumnList;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the HBaseProfilerClient.
+ *
+ * The naming used in this test attempts to be as similar to how the 'groupBy' functionality might be used 'in
+ * the wild'.  This test involves reading and writing two separate groups originating from the same Profile and
+ * Entity.  There is a 'weekdays' group which contains all measurements taken on weekdays.  There is also a
+ * 'weekend' group which contains all measurements taken on weekends.
+ */
+public class HBaseProfilerClientTest {
+
+  private static final String tableName = "profiler";
+  private static final String columnFamily = "P";
+
+  private HBaseProfilerClient client;
+  private HBaseClient hbaseClient;
+  private RowKeyBuilder rowKeyBuilder;
+  private ColumnBuilder columnBuilder;
+  private HTableInterface table;
+  private StellarExecutor executor;
+  private static HBaseTestingUtility util;
+
+  @BeforeClass
+  public static void startHBase() throws Exception {
+    Configuration config = HBaseConfiguration.create();
+    config.set("hbase.master.hostname", "localhost");
+    config.set("hbase.regionserver.hostname", "localhost");
+    util = new HBaseTestingUtility(config);
+    util.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void stopHBase() throws Exception {
+    util.shutdownMiniCluster();
+    util.cleanupTestDir();
+  }
+
+  @Before
+  public void setup() throws Exception {
+
+    // setup all of the necessary dependencies
+    table = util.createTable(Bytes.toBytes(tableName), Bytes.toBytes(columnFamily));
+    hbaseClient = new HBaseClient((c,t) -> table, table.getConfiguration(), tableName);
+    executor = new DefaultStellarExecutor();
+    rowKeyBuilder = new SaltyRowKeyBuilder();
+    columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+
+    // what we're actually testing
+    client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder);
+  }
+
+  /**
+   * Writes profile measurements that can be used for testing.
+   * @param count The number of profile measurements to write.
+   * @param profileName Name of the profile.
+   * @param entityName Name of the entity.
+   * @param value The measurement value.
+   * @param periodsPerHour Number of profile periods per hour.
+   * @param startTime When measurements should start to be written.
+   * @param group The name of the group.
+   */
+  private void writeMeasurements(int count, String profileName, String entityName, int value, int periodsPerHour, long startTime, List<Object> group) {
+
+    // create the first measurement
+    ProfileMeasurement m = new ProfileMeasurement(profileName, entityName, startTime, periodsPerHour);
+    m.setValue(value);
+
+    for(int i=0; i<count; i++) {
+
+      // create a measurement for the next profile period
+      ProfilePeriod next = m.getPeriod().next();
+      m = new ProfileMeasurement(profileName, entityName, next.getTimeInMillis(), periodsPerHour);
+      m.setValue(value);
+
+      // write the measurement
+      write(m, group);
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.deleteTable(tableName);
+  }
+
+  /**
+   * Write a ProfileMeasurement.
+   * @param m The ProfileMeasurement to write.
+   * @param groups The groups to use when writing the ProfileMeasurement.
+   */
+  private void write(ProfileMeasurement m, List<Object> groups) {
+
+    byte[] rowKey = rowKeyBuilder.rowKey(m, groups);
+    ColumnList cols = columnBuilder.columns(m);
+
+    List<Mutation> mutations = hbaseClient.constructMutationReq(rowKey, cols, Durability.SKIP_WAL);
+    hbaseClient.batchMutate(mutations);
+  }
+
+  /**
+   * The client should be able to distinguish between groups and only fetch those in the correct group.
+   */
+  @Test
+  public void testFetchOneGroup() throws Exception {
+
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final int count = hours * periodsPerHour;
+    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+
+    // setup - write two groups of measurements - 'weekends' and 'weekdays'
+    writeMeasurements(count, "profile1", "entity1", expectedValue, periodsPerHour, startTime, Arrays.asList("weekdays"));
+    writeMeasurements(count, "profile1", "entity1", 0, periodsPerHour, startTime, Arrays.asList("weekends"));
+
+    // execute
+    List<Integer> results = client.fetch("profile1", "entity1", hours, TimeUnit.HOURS, Integer.class, Arrays.asList("weekdays"));
+
+    // validate
+    assertEquals(count, results.size());
+    results.forEach(actual -> assertEquals(expectedValue, (int) actual));
+  }
+
+  /**
+   * Attempt to fetch a group that does not exist.
+   */
+  @Test
+  public void testFetchNoGroup() {
+
+    // setup
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final int count = hours * periodsPerHour;
+    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+
+    // create two groups of measurements - one on weekdays and one on weekends
+    writeMeasurements(count, "profile1", "entity1", expectedValue, periodsPerHour, startTime, Arrays.asList("weekdays"));
+    writeMeasurements(count, "profile1", "entity1", 0, periodsPerHour, startTime, Arrays.asList("weekends"));
+
+    // execute
+    List<Integer> results = client.fetch("profile1", "entity1", hours, TimeUnit.HOURS, Integer.class, Arrays.asList("does-not-exist"));
+
+    // validate
+    assertEquals(0, results.size());
+  }
+
+  /**
+   * Profile data only within 'milliseconds ago' should be fetched.  Data outside of that time horizon should
+   * not be fetched.
+   */
+  @Test
+  public void testFetchOutsideTimeWindow() throws Exception {
+
+    // setup - create some measurement values from a day ago
+    final int periodsPerHour = 4;
+    final int hours = 2;
+    final List<Object> group = Arrays.asList("weekends");
+    final long startTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
+    writeMeasurements(hours * periodsPerHour, "profile1", "entity1", 1000, 4, startTime, group);
+
+    // execute
+    List<Integer> results = client.fetch("profile1", "entity1", 2, TimeUnit.MILLISECONDS, Integer.class, group);
+
+    // validate - there should NOT be any results from just 2 milliseconds ago
+    assertEquals(0, results.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler-client/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/resources/log4j.properties b/metron-analytics/metron-profiler-client/src/test/resources/log4j.properties
new file mode 100644
index 0000000..70be8ae
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+#
+
+# Root logger option
+log4j.rootLogger=ERROR, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/README.md b/metron-analytics/metron-profiler/README.md
index fd02b18..6d8fea9 100644
--- a/metron-analytics/metron-profiler/README.md
+++ b/metron-analytics/metron-profiler/README.md
@@ -48,22 +48,12 @@ An expression that determines if a message should be applied to the profile.  A
 
 One or more Stellar expressions used to group the profile measurements when persisted. This is intended to sort the Profile data to allow for a contiguous scan when accessing subsets of the data. 
 
-The 'groupBy' expressions can refer to any field within a `org.apache.metron.profiler.ProfileMeasurement`.  This includes the following fields: 
-  * `profileName`: The name of the profile.
-  * `entity`: The name of the entity being profiled.
-  * `start`: The window start time in milliseconds from the epoch.
-  * `end`: The window end time in milliseconds from the epoch.
-  * `value`: The value calculated over the window period.
-  * `groupBy`: The set of 'groupBy' expressions; not the result of those expressions.
-
-A common use case would be grouping by day of week.  This allows a contiguous scan to access all profile data for Mondays only.  Using the following definition would achieve this. 
+The 'groupBy' expressions can refer to any field within a `org.apache.metron.profiler.ProfileMeasurement`.  A common use case would be grouping by day of week.  This allows a contiguous scan to access all profile data for Mondays only.  Using the following definition would achieve this. 
 
 ```
-"groupBy": [ "DAY_OF_WEEK(start)" ] 
+"groupBy": [ "DAY_OF_WEEK()" ] 
 ```
 
-*NOTE*: A series of date functions will be added to Stellar in a follow-on PR to enhance the types of groups that can be created.
-
 #### `init`
 
 *Optional*
@@ -295,23 +285,27 @@ This section will describe the steps required to get your first profile running.
 
 ## Implementation
 
-## Topology
+## Key Classes
 
-The Profiler is implemented as a Storm topology using the following bolts and spouts.
+* `ProfileMeasurement` - Represents a single data point within a Profile.  A Profile is effectively a time series.  To this end a Profile is composed of many ProfileMeasurement values which in aggregate form a time series.  
 
-### KafkaSpout
+* `ProfilePeriod` - The Profiler captures one `ProfileMeasurement` each `ProfilePeriod`.  A `ProfilePeriod` will occur at fixed, deterministic points in time.  This allows for efficient retrieval of profile data.
 
-A spout that consumes messages from a single Kafka topic.  In most cases, the Profiler topology will consume messages from the `indexing` topic.  This topic contains fully enriched messages that are ready to be indexed.  This ensures that profiles can take advantage of all the available data elements.
+* `RowKeyBuilder` - Builds row keys that can be used to read or write profile data to HBase.
 
-### ProfileSplitterBolt
- 
-The bolt responsible for filtering incoming messages and directing each to the one or more downstream bolts that are responsible for building a Profile.  Each message may be needed by 0, 1 or even many Profiles.  Each emitted tuple contains the 'resolved' entity name, the profile definition, and the input message.
+* `ColumnBuilder` - Defines the columns of data stored with a profile measurement.
+
+* `ProfileHBaseMapper` - Defines for the `HBaseBolt` how profile measurements are stored in HBase.  This class leverages a `RowKeyBuilder` and `ColumnBuilder`.
+
+## Storm Topology
+
+The Profiler is implemented as a Storm topology using the following bolts and spouts.
 
-### ProfileBuilderBolt
+* `KafkaSpout` - A spout that consumes messages from a single Kafka topic.  In most cases, the Profiler topology will consume messages from the `indexing` topic.  This topic contains fully enriched messages that are ready to be indexed.  This ensures that profiles can take advantage of all the available data elements.
 
-This bolt maintains all of the state required to build a Profile.  When the window period expires, the data is summarized as a ProfileMeasurement, all state is flushed, and the ProfileMeasurement is emitted.  Each instance of this bolt is responsible for maintaining the state for a single Profile-Entity pair.
+* `ProfileSplitterBolt` - The bolt responsible for filtering incoming messages and directing each to the one or more downstream bolts that are responsible for building a profile.  Each message may be needed by 0, 1 or even many profiles.  Each emitted tuple contains the 'resolved' entity name, the profile definition, and the input message.
 
-### HBaseBolt
+* `ProfileBuilderBolt` - This bolt maintains all of the state required to build a profile.  When the window period expires, the data is summarized as a `ProfileMeasurement`, all state is flushed, and the `ProfileMeasurement` is emitted.  Each instance of this bolt is responsible for maintaining the state for a single Profile-Entity pair.
 
-A bolt that is responsible for writing to HBase.  Most profiles will be flushed every 15 minutes or so.  If each ProfileBuilderBolt were responsible for writing to HBase itself, there would be little to no opportunity to optimize these writes.  By aggregating the writes from multiple Profile-Entity pairs these writes can be batched, for example.
+* `HBaseBolt` - A bolt that is responsible for writing to HBase.  Most profiles will be flushed every 15 minutes or so.  If each `ProfileBuilderBolt` were responsible for writing to HBase itself, there would be little to no opportunity to optimize these writes.  By aggregating the writes from multiple Profile-Entity pairs these writes can be batched, for example.
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/pom.xml b/metron-analytics/metron-profiler/pom.xml
index b31df67..51aa64a 100644
--- a/metron-analytics/metron-profiler/pom.xml
+++ b/metron-analytics/metron-profiler/pom.xml
@@ -28,6 +28,17 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-hbase</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
@@ -42,20 +53,6 @@
             <artifactId>metron-test-utilities</artifactId>
             <version>${project.parent.version}</version>
             <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
@@ -80,14 +77,22 @@
             <version>${global_hadoop_version}</version>
         </dependency>
         <dependency>
-            <groupId>org.adrianwalker</groupId>
-            <artifactId>multiline-string</artifactId>
-            <version>0.1.2</version>
-            <scope>test</scope>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
             <exclusions>
                 <exclusion>
-                    <artifactId>tools</artifactId>
-                    <groupId>sun.jdk</groupId>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
@@ -95,6 +100,7 @@
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${global_storm_version}</version>
+            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <artifactId>servlet-api</artifactId>
@@ -108,23 +114,25 @@
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-hdfs</artifactId>
+            <artifactId>storm-hbase</artifactId>
             <version>${global_storm_version}</version>
+            <scope>provided</scope>
             <exclusions>
                 <exclusion>
-                    <groupId>org.apache.storm</groupId>
-                    <artifactId>storm-core</artifactId>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-client</artifactId>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
                 </exclusion>
             </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-hbase</artifactId>
+            <artifactId>storm-hdfs</artifactId>
             <version>${global_storm_version}</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
@@ -138,51 +146,6 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-client</artifactId>
-            <version>${global_hbase_version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-server</artifactId>
-            <version>${global_hbase_version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-testing-util</artifactId>
-            <version>${global_hbase_version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>
             <version>${global_mockito_version}</version>
@@ -202,12 +165,10 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-project-info-reports-plugin</artifactId>
                 <version>2.7</version>
-
                 <configuration>
                     <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
                 </configuration>
             </plugin>
-
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>emma-maven-plugin</artifactId>
@@ -285,6 +246,10 @@
                         <configuration>
                             <relocations>
                                 <relocation>
+                                    <pattern>com.lmax</pattern>
+                                    <shadedPattern>org.apache.metron.lmax.metron-profiler</shadedPattern>
+                                </relocation>
+                                <relocation>
                                     <pattern>com.google.common</pattern>
                                     <shadedPattern>org.apache.metron.guava.metron-profiler</shadedPattern>
                                 </relocation>
@@ -334,4 +299,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/main/config/profiler.properties
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/config/profiler.properties b/metron-analytics/metron-profiler/src/main/config/profiler.properties
index 80fd2da..4a9cdb4 100644
--- a/metron-analytics/metron-profiler/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties
@@ -23,12 +23,14 @@
 profiler.workers=1
 profiler.executors=0
 profiler.input.topic=indexing
-profiler.flush.interval.seconds=900
+profiler.periods.per.hour=4
 profiler.hbase.salt.divisor=1000
 profiler.hbase.table=profiler
+profiler.hbase.column.family=P
 profiler.hbase.batch=10
 profiler.hbase.flush.interval.seconds=30
 
+
 ##### Kafka #####
 
 kafka.zk=node1:2181

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index d99510a..835c609 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -25,11 +25,28 @@ components:
     -   id: "defaultExecutor"
         className: "org.apache.metron.profiler.stellar.DefaultStellarExecutor"
 
-    -   id: "hbaseMapper"
-        className: "org.apache.metron.profiler.bolt.ProfileHBaseMapper"
+    -   id: "rowKeyBuilder"
+        className: "org.apache.metron.profiler.hbase.SaltyRowKeyBuilder"
         properties:
             - name: "saltDivisor"
               value: ${profiler.hbase.salt.divisor}
+            - name: "periodsPerHour"
+              value: ${profiler.periods.per.hour}
+
+    -   id: "columnBuilder"
+        className: "org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder"
+        constructorArgs:
+            - "${profiler.hbase.column.family}"
+
+    -   id: "hbaseMapper"
+        className: "org.apache.metron.profiler.bolt.ProfileHBaseMapper"
+        properties:
+            - name: "rowKeyBuilder"
+              ref: "rowKeyBuilder"
+            - name: "columnBuilder"
+              ref: "columnBuilder"
+            - name: "executor"
+              ref: "defaultExecutor"
 
     -   id: "zkHosts"
         className: "storm.kafka.ZkHosts"
@@ -76,8 +93,8 @@ bolts:
         properties:
             - name: "executor"
               ref: "defaultExecutor"
-            - name: "flushFrequency"
-              value: ${profiler.flush.interval.seconds}
+            - name: "periodsPerHour"
+              value: ${profiler.periods.per.hour}
 
     -   id: "hbaseBolt"
         className: "org.apache.metron.hbase.bolt.HBaseBolt"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index 351c5d4..bea0b34 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -41,57 +41,40 @@ public class ProfileMeasurement {
   private String entity;
 
   /**
-   * When the measurement window was started in milliseconds since the epoch.
+   * The actual measurement itself.
    */
-  private long start;
+  private Object value;
 
   /**
-   * When the measurement window closed in milliseconds since the epoch.
+   * A set of expressions used to group the profile measurements when persisted.
    */
-  private long end;
+  private List<String> groupBy;
 
   /**
-   * The actual measurement itself.
+   * The period in which the ProfileMeasurement was taken.
    */
-  private Object value;
+  private ProfilePeriod period;
 
   /**
-   * A set of expressions used to group the profile measurements when persisted.
+   * @param profileName The name of the profile.
+   * @param entity The name of the entity.
+   * @param epochMillis The timestamp when the measurement period has been started in milliseconds since the epoch.
+   * @param periodsPerHour The number of profile periods per hour.
    */
-  private List<String> groupBy;
+  public ProfileMeasurement(String profileName, String entity, long epochMillis, int periodsPerHour) {
+    this.profileName = profileName;
+    this.entity = entity;
+    this.period = new ProfilePeriod(epochMillis, periodsPerHour);
+  }
 
   public String getProfileName() {
     return profileName;
   }
 
-  public void setProfileName(String profileName) {
-    this.profileName = profileName;
-  }
-
   public String getEntity() {
     return entity;
   }
 
-  public void setEntity(String entity) {
-    this.entity = entity;
-  }
-
-  public long getStart() {
-    return start;
-  }
-
-  public void setStart(long start) {
-    this.start = start;
-  }
-
-  public long getEnd() {
-    return end;
-  }
-
-  public void setEnd(long end) {
-    this.end = end;
-  }
-
   public Object getValue() {
     return value;
   }
@@ -100,6 +83,10 @@ public class ProfileMeasurement {
     this.value = value;
   }
 
+  public ProfilePeriod getPeriod() {
+    return period;
+  }
+
   public List<String> getGroupBy() {
     return groupBy;
   }
@@ -115,22 +102,21 @@ public class ProfileMeasurement {
 
     ProfileMeasurement that = (ProfileMeasurement) o;
 
-    if (start != that.start) return false;
-    if (end != that.end) return false;
     if (profileName != null ? !profileName.equals(that.profileName) : that.profileName != null) return false;
     if (entity != null ? !entity.equals(that.entity) : that.entity != null) return false;
     if (value != null ? !value.equals(that.value) : that.value != null) return false;
-    return groupBy != null ? groupBy.equals(that.groupBy) : that.groupBy == null;
+    if (groupBy != null ? !groupBy.equals(that.groupBy) : that.groupBy != null) return false;
+    return period != null ? period.equals(that.period) : that.period == null;
+
   }
 
   @Override
   public int hashCode() {
     int result = profileName != null ? profileName.hashCode() : 0;
     result = 31 * result + (entity != null ? entity.hashCode() : 0);
-    result = 31 * result + (int) (start ^ (start >>> 32));
-    result = 31 * result + (int) (end ^ (end >>> 32));
     result = 31 * result + (value != null ? value.hashCode() : 0);
     result = 31 * result + (groupBy != null ? groupBy.hashCode() : 0);
+    result = 31 * result + (period != null ? period.hashCode() : 0);
     return result;
   }
 
@@ -139,10 +125,9 @@ public class ProfileMeasurement {
     return "ProfileMeasurement{" +
             "profileName='" + profileName + '\'' +
             ", entity='" + entity + '\'' +
-            ", start=" + start +
-            ", end=" + end +
             ", value=" + value +
             ", groupBy=" + groupBy +
+            ", period=" + period +
             '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfilePeriod.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
new file mode 100644
index 0000000..648eb28
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
@@ -0,0 +1,199 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import java.util.Calendar;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+
+/**
+ * The Profiler captures a ProfileMeasurement once every ProfilePeriod.  There can be
+ * multiple ProfilePeriods every hour.
+ */
+public class ProfilePeriod {
+
+  /**
+   * The year.
+   */
+  private int year;
+
+  /**
+   * Day of the year; [1, 366]
+   */
+  private int dayOfYear;
+
+  /**
+   * Hour of the day; [0, 23]
+   */
+  private int hour;
+
+  /**
+   * The period within the hour; [0, periodsPerHour)
+   */
+  private int period;
+
+  /**
+   * The number of periods per hour.  This value must be a divisor or multiple
+   * of 60; 1, 2, 4, 6, 240, etc.
+   */
+  private int periodsPerHour;
+
+  /**
+   * The actual time used to initialize the ProfilePeriod.  This value should not be
+   * used for anything other than troubleshooting.
+   */
+  private long epochMillis;
+
+  /**
+   * @param epochMillis A timestamp contained somewhere within the profile period.
+   * @param periodsPerHour The number of periods per hour. Must be a divisor or multiple
+   *                       of 60; 1, 2, 4, 6, 240, etc.
+   */
+  public ProfilePeriod(long epochMillis, int periodsPerHour) {
+
+    // periods per hour must be a divisor or multiple of 60
+    if(60 % periodsPerHour != 0 && periodsPerHour % 60 != 0) {
+      throw new RuntimeException(format("invalid periodsPerHour: expected=divisor/multiple of 60, actual=%d", periodsPerHour));
+    }
+
+    Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+    cal.setTimeInMillis(epochMillis);
+
+    this.periodsPerHour = periodsPerHour;
+    this.period = findPeriod(cal.get(Calendar.MINUTE), cal.get(Calendar.SECOND), periodsPerHour);
+    this.hour = cal.get(Calendar.HOUR_OF_DAY);
+    this.dayOfYear = cal.get(Calendar.DAY_OF_YEAR);
+    this.year = cal.get(Calendar.YEAR);
+    this.epochMillis = epochMillis;
+  }
+
+  /**
+   * Returns the next ProfilePeriod in time.
+   */
+  public ProfilePeriod next() {
+    long nextMillis = this.getTimeInMillis() + millisPerPeriod(periodsPerHour);
+    return new ProfilePeriod(nextMillis, periodsPerHour);
+  }
+
+  /**
+   * @return The time in milliseconds since the epoch.
+   */
+  public long getTimeInMillis() {
+
+    int millisPastHour = (int) millisPerPeriod(periodsPerHour) * period;
+
+    Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+    cal.set(Calendar.YEAR, year);
+    cal.set(Calendar.DAY_OF_YEAR, dayOfYear);
+    cal.set(Calendar.HOUR_OF_DAY, hour);
+    cal.set(Calendar.MINUTE, (millisPastHour / 1000) / 60);
+    cal.set(Calendar.SECOND, (millisPastHour / 1000) % 60);
+    cal.set(Calendar.MILLISECOND, 0);
+
+    return cal.getTimeInMillis();
+  }
+
+  public int getYear() {
+    return year;
+  }
+
+  public int getDayOfYear() {
+    return dayOfYear;
+  }
+
+  public int getHour() {
+    return hour;
+  }
+
+  public int getPeriod() {
+    return period;
+  }
+
+  public int getPeriodsPerHour() {
+    return periodsPerHour;
+  }
+
+  /**
+   * Determines the period within the hour based on the minutes/seconds on the clock.
+   *
+   * @param minutes The minute within the hour; 0-59.
+   * @param seconds The second within the minute; 0-59.
+   * @return The period within the hour.
+   */
+  private static int findPeriod(int minutes, int seconds, int periodsPerHour) {
+    final int secondsInHour = minutes * 60 + seconds;
+    return (int) (secondsInHour / secondsPerPeriod(periodsPerHour));
+  }
+
+  /**
+   * The number of seconds in each period.
+   * @param periodsPerHour The number of periods per hour.
+   */
+  private static double secondsPerPeriod(int periodsPerHour) {
+    return millisPerPeriod(periodsPerHour) / 1000L;
+  }
+
+  /**
+   * The number of milliseconds in each period.
+   * @param periodsPerHour The number of periods per hour.
+   */
+  private static long millisPerPeriod(int periodsPerHour) {
+    final long millisPerHour = 60L * 60L * 1000L;
+    return millisPerHour / periodsPerHour;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    ProfilePeriod that = (ProfilePeriod) o;
+
+    if (year != that.year) return false;
+    if (dayOfYear != that.dayOfYear) return false;
+    if (hour != that.hour) return false;
+    if (period != that.period) return false;
+    return periodsPerHour == that.periodsPerHour;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = year;
+    result = 31 * result + dayOfYear;
+    result = 31 * result + hour;
+    result = 31 * result + period;
+    result = 31 * result + periodsPerHour;
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "ProfilePeriod{" +
+            "year=" + year +
+            ", dayOfYear=" + dayOfYear +
+            ", hour=" + hour +
+            ", period=" + period +
+            ", periodsPerHour=" + periodsPerHour +
+            '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 40cbb0e..79c046a 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -64,9 +64,10 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   private StellarExecutor executor;
 
   /**
-   * The number of seconds between when the Profile is flushed.
+   * The number of times per hour that a profile is flushed and a measurement
+   * is written.  This should be a divisor or multiple of 60; 1, 2, 3, 4, 6, 240, etc.
    */
-  private int flushFrequency;
+  private int periodsPerHour;
 
   /**
    * A ProfileMeasurement is created and emitted each window period.  A Profile
@@ -100,7 +101,11 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   @Override
   public Map<String, Object> getComponentConfiguration() {
     Config conf = new Config();
-    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, getFlushFrequency());
+
+    // how frequently should the bolt receive tick tuples?
+    long freqInSeconds = ((60 * 60) / periodsPerHour);
+    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, freqInSeconds);
+
     return conf;
   }
 
@@ -153,18 +158,12 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   private void doExecute(Tuple input) {
 
     if(!isTickTuple(input)) {
-
-      // if this is the first tuple in a window period, initialization is needed
       if (!isInitialized()) {
         init(input);
       }
-
-      // update the profile with data from a new message
       update(input);
 
     } else {
-
-      // flush the profile - can only flush if it has been initialized
       if(isInitialized()) {
         flush(input);
       }
@@ -182,10 +181,11 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     profileConfig = (ProfileConfig) input.getValueByField("profile");
 
     // create the measurement which will be saved at the end of the window period
-    measurement = new ProfileMeasurement();
-    measurement.setStart(getTimestamp());
-    measurement.setEntity(input.getStringByField("entity"));
-    measurement.setProfileName(profileConfig.getProfile());
+    measurement = new ProfileMeasurement(
+            profileConfig.getProfile(),
+            input.getStringByField("entity"),
+            getTimestamp(),
+            periodsPerHour);
 
     // execute the 'init' expression
     try {
@@ -194,8 +194,8 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
       expressions.forEach((var, expr) -> executor.assign(var, expr, message));
 
     } catch(ParseException e) {
-      String msg = format("Bad 'init' expression: %s, profile=%s, entity=%s, start=%d",
-              e.getMessage(), measurement.getProfileName(), measurement.getEntity(), measurement.getStart());
+      String msg = format("Bad 'init' expression: %s, profile=%s, entity=%s",
+              e.getMessage(), measurement.getProfileName(), measurement.getEntity());
       throw new ParseException(msg, e);
     }
   }
@@ -213,8 +213,8 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
       expressions.forEach((var, expr) -> executor.assign(var, expr, message));
 
     } catch(ParseException e) {
-      String msg = format("Bad 'update' expression: %s, profile=%s, entity=%s, start=%d",
-              e.getMessage(), measurement.getProfileName(), measurement.getEntity(), measurement.getStart());
+      String msg = format("Bad 'update' expression: %s, profile=%s, entity=%s",
+              e.getMessage(), measurement.getProfileName(), measurement.getEntity());
       throw new ParseException(msg, e);
     }
   }
@@ -227,8 +227,8 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
    * the next window period.
    */
   private void flush(Tuple tickTuple) {
-    LOG.info(String.format("Flushing profile: profile=%s, entity=%s, start=%d",
-            measurement.getProfileName(), measurement.getEntity(), measurement.getStart()));
+    LOG.info(String.format("Flushing profile: profile=%s, entity=%s",
+            measurement.getProfileName(), measurement.getEntity()));
 
     // execute the 'result' expression
     Object result;
@@ -241,7 +241,6 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     }
 
     // emit the completed profile measurement
-    measurement.setEnd(getTimestamp());
     measurement.setValue(result);
     emit(measurement, tickTuple);
 
@@ -294,11 +293,9 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     this.executor = executor;
   }
 
-  public int getFlushFrequency() {
-    return flushFrequency;
+  public void setPeriodsPerHour(int periodsPerHour) {
+    this.periodsPerHour = periodsPerHour;
   }
 
-  public void setFlushFrequency(int flushFrequency) {
-    this.flushFrequency = flushFrequency;
-  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
index f76b943..b0b33cc 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
@@ -22,18 +22,21 @@ package org.apache.metron.profiler.bolt;
 
 import backtype.storm.tuple.Tuple;
 import org.apache.commons.beanutils.BeanMap;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.dsl.ParseException;
 import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
 import org.apache.metron.profiler.stellar.StellarExecutor;
 import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
 import org.apache.storm.hbase.common.ColumnList;
 
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
 
 import static java.lang.String.format;
+import static org.apache.commons.collections.CollectionUtils.isEmpty;
 
 /**
  * An HbaseMapper that defines how a ProfileMeasurement is persisted within an HBase table.
@@ -46,26 +49,23 @@ public class ProfileHBaseMapper implements HBaseMapper {
   private StellarExecutor executor;
 
   /**
-   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
-   * divisor is used to generate the salt.
-   *
-   * If the salt divisor is 0, a salt will not be used.  By default, the salt is set
-   * to 0 and is not used in the row key.
-   *
-   * If the salt divisor is not 0, the salt will be prepended to the row key to help
-   * prevent hot-spotting.  When used this constant should be roughly equal to the
-   * number of nodes in the Hbase cluster.
+   * Generates the row keys necessary to store profile data in HBase.
    */
-  private int saltDivisor;
+  private RowKeyBuilder rowKeyBuilder;
 
   /**
-   * The name of the column family.
+   * Generates the ColumnList necesary to store profile data in HBase.
    */
-  private String columnFamily;
+  private ColumnBuilder columnBuilder;
 
   public ProfileHBaseMapper() {
-    setColumnFamily("P");
-    setSaltDivisor(1000);
+    setRowKeyBuilder(new SaltyRowKeyBuilder());
+    setColumnBuilder(new ValueOnlyColumnBuilder());
+  }
+
+  public ProfileHBaseMapper(RowKeyBuilder rowKeyBuilder, ColumnBuilder columnBuilder) {
+    setRowKeyBuilder(rowKeyBuilder);
+    setColumnBuilder(columnBuilder);
   }
 
   /**
@@ -76,68 +76,8 @@ public class ProfileHBaseMapper implements HBaseMapper {
   @Override
   public byte[] rowKey(Tuple tuple) {
     ProfileMeasurement m = (ProfileMeasurement) tuple.getValueByField("measurement");
-
-    // execute the 'groupBy' expressions to determine the 'groups' used in the row key
-    String groups = executeGroupBy(m);
-
-    // row key = profile + entity + [group1, ...] + timestamp
-    int length = m.getProfileName().length() + m.getEntity().length() + groups.length() + Long.BYTES;
-
-    ByteBuffer buffer;
-    if(saltDivisor > 0) {
-      // the row key needs to be prepended with a salt
-      byte[] salt = getSalt(m.getStart(), saltDivisor);
-      buffer = ByteBuffer
-              .allocate(length + salt.length)
-              .put(salt);
-
-    } else {
-      // no salt is needed
-      buffer = ByteBuffer
-              .allocate(length);
-    }
-
-    // append the remainder of the fields
-    buffer.put(m.getProfileName().getBytes())
-            .put(m.getEntity().getBytes())
-            .put(groups.getBytes())
-            .putLong(m.getStart());
-
-    buffer.flip();
-    return buffer.array();
-  }
-
-  /**
-   * Executes each of the 'groupBy' expressions.  The results of each
-   * are then appended to one another and returned as a String.
-   * @param m
-   * @return
-   */
-  private String executeGroupBy(ProfileMeasurement m) {
-
-    if(m.getGroupBy() == null || m.getGroupBy().size() == 0) {
-      // no groupBy expressions define
-      return "";
-    }
-
-    // allows each 'groupBy' expression to refer to the fields of the ProfileMeasurement
-    BeanMap measureAsMap = new BeanMap(m);
-    StringBuilder builder = new StringBuilder();
-
-    try {
-      // execute each of the 'groupBy' - build a String out of the results
-      for (String expr : m.getGroupBy()) {
-        Object result = executor.execute(expr, measureAsMap, Object.class);
-        builder.append(result);
-      }
-
-    } catch(Throwable e) {
-      String msg = format("Bad 'groupBy' expression: %s, profile=%s, entity=%s, start=%d",
-              e.getMessage(), m.getProfileName(), m.getEntity(), m.getStart());
-      throw new ParseException(msg, e);
-    }
-
-    return builder.toString();
+    List<Object> groups = executeGroupBy(m);
+    return rowKeyBuilder.rowKey(m, groups);
   }
 
   /**
@@ -147,94 +87,48 @@ public class ProfileHBaseMapper implements HBaseMapper {
   @Override
   public ColumnList columns(Tuple tuple) {
     ProfileMeasurement measurement = (ProfileMeasurement) tuple.getValueByField("measurement");
-
-    byte[] cfBytes = Bytes.toBytes(columnFamily);
-    ColumnList cols = new ColumnList();
-    cols.addColumn(cfBytes, QPROFILE, Bytes.toBytes(measurement.getProfileName()));
-    cols.addColumn(cfBytes, QENTITY, Bytes.toBytes(measurement.getEntity()));
-    cols.addColumn(cfBytes, QSTART, Bytes.toBytes(measurement.getStart()));
-    cols.addColumn(cfBytes, QEND, Bytes.toBytes(measurement.getEnd()));
-    cols.addColumn(cfBytes, QVALUE, toBytes(measurement.getValue()));
-
-    return cols;
-  }
-
-  /**
-   * Serialize a profile measurement's value.
-   *
-   * The value produced by a Profile definition can be any numeric data type.  The data
-   * type depends on how the profile is defined by the user.  The user should be able to
-   * choose the data type that is most suitable for their use case.
-   *
-   * @param value The value to serialize.
-   */
-  private byte[] toBytes(Object value) {
-    byte[] result;
-
-    if(value instanceof Integer) {
-      result = Bytes.toBytes((Integer) value);
-    } else if(value instanceof Double) {
-      result = Bytes.toBytes((Double) value);
-    } else if(value instanceof Short) {
-      result = Bytes.toBytes((Short) value);
-    } else if(value instanceof Long) {
-      result = Bytes.toBytes((Long) value);
-    } else if(value instanceof Float) {
-      result = Bytes.toBytes((Float) value);
-    } else {
-      throw new RuntimeException("Expected 'Number': actual=" + value);
-    }
-
-    return result;
+    return columnBuilder.columns(measurement);
   }
 
   /**
-   * Calculates a salt value that is used as part of the row key.
-   *
-   * The salt is calculated as 'md5(timestamp) % N' where N is a configurable value that ideally
-   * is close to the number of nodes in the Hbase cluster.
-   *
-   * @param epoch The timestamp in epoch millis to use in generating the salt.
+   * Executes each of the 'groupBy' expressions.  The result of each
+   * expression are the groups used to sort the data as part of the
+   * row key.
+   * @param m The profile measurement.
+   * @return The result of executing the 'groupBy' expressions.
    */
-  public static byte[] getSalt(long epoch, int saltDivisor) {
-    try {
-      MessageDigest digest = MessageDigest.getInstance("MD5");
-      byte[] hash = digest.digest(Bytes.toBytes(epoch));
-      int salt = Bytes.toInt(hash) % saltDivisor;
-      return Bytes.toBytes(salt);
-
-    } catch(NoSuchAlgorithmException e) {
-      throw new RuntimeException(e);
+  private List<Object> executeGroupBy(ProfileMeasurement m) {
+    List<Object> groups = new ArrayList<>();
+
+    if(!isEmpty(m.getGroupBy())) {
+      try {
+        // allows each 'groupBy' expression to refer to the fields of the ProfileMeasurement
+        BeanMap measureAsMap = new BeanMap(m);
+
+        for (String expr : m.getGroupBy()) {
+          Object result = executor.execute(expr, measureAsMap, Object.class);
+          groups.add(result);
+        }
+
+      } catch(Throwable e) {
+        String msg = format("Bad 'groupBy' expression: %s, profile=%s, entity=%s",
+                e.getMessage(), m.getProfileName(), m.getEntity());
+        throw new ParseException(msg, e);
+      }
     }
-  }
-
-  public String getColumnFamily() {
-    return columnFamily;
-  }
-
-  public void setColumnFamily(String columnFamily) {
-    this.columnFamily = columnFamily;
-  }
 
-  public int getSaltDivisor() {
-    return saltDivisor;
+    return groups;
   }
 
-  public void setSaltDivisor(int saltDivisor) {
-    this.saltDivisor = saltDivisor;
+  public void setExecutor(StellarExecutor executor) {
+    this.executor = executor;
   }
 
-  public StellarExecutor getExecutor() {
-    return executor;
+  public void setRowKeyBuilder(RowKeyBuilder rowKeyBuilder) {
+    this.rowKeyBuilder = rowKeyBuilder;
   }
 
-  public void setExecutor(StellarExecutor executor) {
-    this.executor = executor;
+  public void setColumnBuilder(ColumnBuilder columnBuilder) {
+    this.columnBuilder = columnBuilder;
   }
-
-  public static final byte[] QPROFILE = Bytes.toBytes("profile");
-  public static final byte[] QENTITY = Bytes.toBytes("entity");
-  public static final byte[] QSTART = Bytes.toBytes("start");
-  public static final byte[] QEND = Bytes.toBytes("end");
-  public static final byte[] QVALUE = Bytes.toBytes("value");
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java
new file mode 100644
index 0000000..44bf129
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java
@@ -0,0 +1,50 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.hbase;
+
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.hbase.common.ColumnList;
+
+import java.io.Serializable;
+
+/**
+ * Defines how fields in a ProfileMeasurement will be mapped to columns in HBase.
+ */
+public interface ColumnBuilder extends Serializable {
+
+  /**
+   * Generate the columns used to store a ProfileMeasurement.
+   * @param measurement The profile measurement.
+   */
+  ColumnList columns(ProfileMeasurement measurement);
+
+  /**
+   * Returns the column family used to store the ProfileMeasurement values.
+   * @return
+   */
+  String getColumnFamily();
+
+  /**
+   * Returns the column qualifiers for the given field of a ProfileMeasurement.
+   * @return The column qualifier used to store a ProfileMeasurement's field in HBase.
+   */
+  byte[] getColumnQualifier(String fieldName);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3dfd7be6/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java
new file mode 100644
index 0000000..bb8cc80
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java
@@ -0,0 +1,60 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.hbase;
+
+import org.apache.metron.profiler.ProfileMeasurement;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Builds a row key that can be used to read or write ProfileMeasurement data
+ * to HBase.
+ */
+public interface RowKeyBuilder extends Serializable {
+
+  /**
+   * Build a row key for a given ProfileMeasurement.
+   *
+   * This method is useful when writing ProfileMeasurements to HBase.
+   *
+   * @param measurement The profile measurement.
+   * @param groups The groups used to sort the profile data.
+   * @return The HBase row key.
+   */
+  byte[] rowKey(ProfileMeasurement measurement, List<Object> groups);
+
+  /**
+   * Builds a list of row keys necessary to retrieve a profile's measurements over
+   * a time horizon.
+   *
+   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
+   *
+   * @param profile The name of the profile.
+   * @param entity The name of the entity.
+   * @param groups The group(s) used to sort the profile data.
+   * @param durationAgo How long ago?
+   * @param unit The time units of how long ago.
+   * @return All of the row keys necessary to retrieve the profile measurements.
+   */
+  List<byte[]> rowKeys(String profile, String entity, List<Object> groups, long durationAgo, TimeUnit unit);
+}


Mime
View raw message