metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmerri...@apache.org
Subject [12/50] [abbrv] metron git commit: METRON-590 Enable Use of Event Time in Profiler (nickwallen) closes apache/metron#965
Date Fri, 27 Apr 2018 19:29:52 GMT
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/StandAloneProfilerTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/StandAloneProfilerTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/StandAloneProfilerTest.java
new file mode 100644
index 0000000..2269c86
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/StandAloneProfilerTest.java
@@ -0,0 +1,255 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the StandAloneProfiler class.
+ */
+public class StandAloneProfilerTest {
+
+  /**
+   * {
+   *   "profiles": [
+   *   ]
+   * }
+   */
+  @Multiline
+  private String noProfiles;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "'global'",
+   *        "init": { "count": 0 },
+   *        "update": { "count": "count + 1" },
+   *        "result": "count"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String oneProfile;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "'global1'",
+   *        "result": "'result'"
+   *      },
+   *      {
+   *        "profile": "profile2",
+   *        "foreach": "'global2'",
+   *        "result": "'result'"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String twoProfiles;
+
+  /**
+   * {
+   *   "ip_src_addr": "10.0.0.1",
+   *   "ip_dst_addr": "10.0.0.20",
+   *   "protocol": "HTTP",
+   *   "timestamp": 2222222222222,
+   * }
+   */
+  @Multiline
+  private String messageJson;
+
+  private JSONObject message;
+
+  private long periodDurationMillis = TimeUnit.MINUTES.toMillis(15);
+
+  private Context context = Context.EMPTY_CONTEXT();
+
+  @Before
+  public void setup() throws Exception {
+
+    // parse the input message
+    JSONParser parser = new JSONParser();
+    message = (JSONObject) parser.parse(messageJson);
+  }
+
+  @Test
+  public void testWithOneProfile() throws Exception {
+
+    StandAloneProfiler profiler = createProfiler(oneProfile);
+    profiler.apply(message);
+    profiler.apply(message);
+    profiler.apply(message);
+
+    List<ProfileMeasurement> measurements = profiler.flush();
+    assertEquals(1, measurements.size());
+
+    // expect 1 measurement for the 1 profile that has been defined
+    ProfileMeasurement m = measurements.get(0);
+    assertEquals("profile1", m.getProfileName());
+    assertEquals(3, m.getProfileValue());
+  }
+
+
+  @Test
+  public void testWithTwoProfiles() throws Exception {
+
+    StandAloneProfiler profiler = createProfiler(twoProfiles);
+    profiler.apply(message);
+    profiler.apply(message);
+    profiler.apply(message);
+
+    List<ProfileMeasurement> measurements = profiler.flush();
+    assertEquals(2, measurements.size());
+
+    // expect 2 measurements, 1 for each profile
+    List<String> expected = Arrays.asList(new String[] { "profile1", "profile2" });
+    {
+      ProfileMeasurement m = measurements.get(0);
+      assertTrue(expected.contains(m.getProfileName()));
+      assertEquals("result", m.getProfileValue());
+    }
+    {
+      ProfileMeasurement m = measurements.get(1);
+      assertTrue(expected.contains(m.getProfileName()));
+      assertEquals("result", m.getProfileValue());
+    }
+  }
+
+  /**
+   * The message count and route count will always be equal, if there is only one
+   * profile defined.  The message count and route count can be different when there
+   * are multiple profiles defined that each use the same message.
+   */
+  @Test
+  public void testRouteAndMessageCounters() throws Exception {
+    {
+      StandAloneProfiler profiler = createProfiler(noProfiles);
+
+      profiler.apply(message);
+      assertEquals(1, profiler.getMessageCount());
+      assertEquals(0, profiler.getRouteCount());
+
+      profiler.apply(message);
+      assertEquals(2, profiler.getMessageCount());
+      assertEquals(0, profiler.getRouteCount());
+
+      profiler.apply(message);
+      assertEquals(3, profiler.getMessageCount());
+      assertEquals(0, profiler.getRouteCount());
+    }
+    {
+      StandAloneProfiler profiler = createProfiler(oneProfile);
+
+      profiler.apply(message);
+      assertEquals(1, profiler.getMessageCount());
+      assertEquals(1, profiler.getRouteCount());
+
+      profiler.apply(message);
+      assertEquals(2, profiler.getMessageCount());
+      assertEquals(2, profiler.getRouteCount());
+
+      profiler.apply(message);
+      assertEquals(3, profiler.getMessageCount());
+      assertEquals(3, profiler.getRouteCount());
+    }
+    {
+      StandAloneProfiler profiler = createProfiler(twoProfiles);
+
+      profiler.apply(message);
+      assertEquals(1, profiler.getMessageCount());
+      assertEquals(2, profiler.getRouteCount());
+
+      profiler.apply(message);
+      assertEquals(2, profiler.getMessageCount());
+      assertEquals(4, profiler.getRouteCount());
+
+      profiler.apply(message);
+      assertEquals(3, profiler.getMessageCount());
+      assertEquals(6, profiler.getRouteCount());
+    }
+  }
+
+  @Test
+  public void testProfileCount() throws Exception {
+    {
+      StandAloneProfiler profiler = createProfiler(noProfiles);
+      assertEquals(0, profiler.getProfileCount());
+    }
+    {
+      StandAloneProfiler profiler = createProfiler(oneProfile);
+      assertEquals(1, profiler.getProfileCount());
+    }
+    {
+      StandAloneProfiler profiler = createProfiler(twoProfiles);
+      assertEquals(2, profiler.getProfileCount());
+    }
+  }
+
+  /**
+   * Creates a ProfilerConfig based on a string containing JSON.
+   *
+   * @param configAsJSON The config as JSON.
+   * @return The ProfilerConfig.
+   * @throws Exception
+   */
+  private ProfilerConfig toProfilerConfig(String configAsJSON) throws Exception {
+
+    InputStream in = new ByteArrayInputStream(configAsJSON.getBytes("UTF-8"));
+    return JSONUtils.INSTANCE.load(in, ProfilerConfig.class);
+  }
+
+  /**
+   * Creates the StandAloneProfiler
+   *
+   * @param profileJson The Profiler configuration to use as a String containing JSON.
+   * @throws Exception
+   */
+  private StandAloneProfiler createProfiler(String profileJson) throws Exception {
+
+    // the TTL and max routes need not be bounded
+    long profileTimeToLiveMillis = Long.MAX_VALUE;
+    long maxNumberOfRoutes = Long.MAX_VALUE;
+
+    ProfilerConfig config = toProfilerConfig(profileJson);
+    return new StandAloneProfiler(config, periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/DefaultClockFactoryTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/DefaultClockFactoryTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/DefaultClockFactoryTest.java
new file mode 100644
index 0000000..c99b401
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/DefaultClockFactoryTest.java
@@ -0,0 +1,75 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Optional;
+
+/**
+ * Tests the DefaultClockFactory.
+ */
+public class DefaultClockFactoryTest {
+
+  /**
+   * The object under test.
+   */
+  private DefaultClockFactory clockFactory;
+
+  @Before
+  public void setup() {
+    clockFactory = new DefaultClockFactory();
+  }
+
+  /**
+   * When a 'timestampField' is defined the factory should return a clock
+   * that deals with event time.
+   */
+  @Test
+  public void testCreateEventTimeClock() {
+
+    // configure the profiler to use event time
+    ProfilerConfig config = new ProfilerConfig();
+    config.setTimestampField(Optional.of("timestamp"));
+
+    // the factory should return a clock that handles 'event time'
+    Clock clock = clockFactory.createClock(config);
+    assertTrue(clock instanceof EventTimeClock);
+  }
+
+  /**
+   * When a 'timestampField' is defined the factory should return a clock
+   * that deals with processing time.
+   */
+  @Test
+  public void testCreateProcessingTimeClock() {
+
+    // the profiler uses processing time by default
+    ProfilerConfig config = new ProfilerConfig();
+
+    // the factory should return a clock that handles 'processing time'
+    Clock clock = clockFactory.createClock(config);
+    assertTrue(clock instanceof WallClock);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeClockTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeClockTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeClockTest.java
new file mode 100644
index 0000000..0397250
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeClockTest.java
@@ -0,0 +1,115 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class EventTimeClockTest {
+
+  private final String timestampField = "timestamp";
+
+  public JSONObject createMessage() {
+    return new JSONObject();
+  }
+
+  /**
+   * The event time should be extracted from a field contained within a message.
+   */
+  @Test
+  public void testEventTime() {
+
+    JSONObject message = createMessage();
+
+    // add a field containing a timestamp to the message
+    final Long timestamp = System.currentTimeMillis();
+    message.put(timestampField, timestamp);
+
+    // what time is it?
+    EventTimeClock clock = new EventTimeClock(timestampField);
+    Optional<Long> result = clock.currentTimeMillis(message);
+
+    // validate
+    assertTrue(result.isPresent());
+    assertEquals(timestamp, result.get());
+  }
+
+  /**
+   * If the timestamp field is a String, it should be converted to Long and used as-is.
+   */
+  @Test
+  public void testEventTimeWithString() {
+    JSONObject message = createMessage();
+
+    // the timestamp field is a string
+    final Long timestamp = System.currentTimeMillis();
+    message.put(timestampField, timestamp.toString());
+
+    // what time is it?
+    EventTimeClock clock = new EventTimeClock(timestampField);
+    Optional<Long> result = clock.currentTimeMillis(message);
+
+    // validate
+    assertTrue(result.isPresent());
+    assertEquals(timestamp, result.get());
+  }
+
+  /**
+   * If the message does not contain the timestamp field, then nothing should be returned.
+   */
+  @Test
+  public void testMissingTimestampField() {
+
+    // no timestamp added to the message
+    JSONObject message = createMessage();
+
+    // what time is it?
+    EventTimeClock clock = new EventTimeClock(timestampField);
+    Optional<Long> result = clock.currentTimeMillis(message);
+
+    // validate
+    assertFalse(result.isPresent());
+  }
+
+  /**
+   * No timestamp should be returned if the value stored in the timestamp field
+   * cannot be coerced into a valid timestamp.
+   */
+  @Test
+  public void testInvalidValue() {
+
+    // create a message with an invalid value stored in the timestamp field
+    JSONObject message = createMessage();
+    message.put(timestampField, "invalid-timestamp-value");
+
+    // what time is it?
+    EventTimeClock clock = new EventTimeClock(timestampField);
+    Optional<Long> result = clock.currentTimeMillis(message);
+
+    // no value should be returned
+    assertFalse(result.isPresent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/WallClockTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/WallClockTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/WallClockTest.java
new file mode 100644
index 0000000..76b2d7b
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/WallClockTest.java
@@ -0,0 +1,54 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertTrue;
+
+public class WallClockTest {
+
+  public JSONObject createMessage() {
+    return new JSONObject();
+  }
+
+  /**
+   * The wall clock time ALWAYS comes from the system clock.
+   */
+  @Test
+  public void testCurrentTimeMillis() {
+
+    JSONObject message = createMessage();
+    long before = System.currentTimeMillis();
+
+    // what time is it?
+    WallClock clock = new WallClock();
+    Optional<Long> result = clock.currentTimeMillis(message);
+
+    // validate
+    long after = System.currentTimeMillis();
+    assertTrue(result.isPresent());
+    assertTrue(result.get() >= before);
+    assertTrue(result.get() <= after);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/README.md b/metron-analytics/metron-profiler/README.md
index dc5ec07..218ec66 100644
--- a/metron-analytics/metron-profiler/README.md
+++ b/metron-analytics/metron-profiler/README.md
@@ -328,6 +328,62 @@ Continuing the previous running example, at this point, you have seen how your p
 
 ## Anatomy of a Profile
 
+### Profiler
+
+The Profiler configuration contains only two fields; only one of which is required.
+
+```
+{
+    "profiles": [
+        { "profile": "one", ... },
+        { "profile": "two", ... }
+    ],
+    "timestampField": "timestamp"
+}
+```
+
+| Name                              |               | Description
+|---                                |---            |---
+| [profiles](#profiles)             | Required      | A list of zero or more Profile definitions.
+| [timestampField](#timestampfield) | Optional      | Indicates whether processing time or event time should be used. By default, processing time is enabled.
+
+
+#### `profiles`
+
+*Required*
+
+A list of zero or more Profile definitions.
+
+#### `timestampField`
+
+*Optional*
+
+Indicates whether processing time or event time is used. By default, processing time is enabled.
+
+##### Processing Time
+
+By default, no `timestampField` is defined.  In this case, the Profiler uses system time when generating profiles.  This means that the profiles are generated based on when the data has been processed by the Profiler.  This is also known as 'processing time'.
+
+This is the simplest mode of operation, but has some draw backs.  If the Profiler is consuming live data and all is well, the processing and event times will likely remain similar and consistent. If processing time diverges from event time, then the Profiler will generate skewed profiles. 
+
+There are a few scenarios that might cause skewed profiles when using processing time.  For example when a system has undergone a scheduled maintenance window and is restarted, a high volume of messages will need to be processed by the Profiler. The output of the Profiler might indicate an increase in activity during this time, although no change in activity actually occurred on the target network. The same situation could occur if an upstream system which provides telemetry undergoes an outage.  
+
+[Event Time](#event-time) can be used to mitigate these problems.
+
+##### Event Time
+
+Alternatively, a `timestampField` can be defined.  This must be the name of a field contained within the telemetry processed by the Profiler.  The Profiler will extract and use the timestamp contained within this field.
+
+* If a message does not contain this field, it will be dropped.
+
+* The field must contain a timestamp in epoch milliseconds expressed as either a numeric or string. Otherwise, the message will be dropped.
+
+* The Profiler will use the same field across all telemetry sources and for all profiles.
+
+* Be aware of clock skew across telemetry sources.  If your profile is processing telemetry from multiple sources where the clock differs significantly, the Profiler may assume that some of those messages are late and will be ignored.  Adjusting the [`profiler.window.duration`](#profilerwindowduration) and [`profiler.window.lag`](#profilerwindowlag) can help accommodate skewed clocks. 
+
+### Profiles
+
 A profile definition requires a JSON-formatted set of elements, many of which can contain Stellar code.  The specification contains the following elements.  (For the impatient, skip ahead to the [Examples](#examples).)
 
 | Name                          |               | Description
@@ -466,15 +522,19 @@ The values can be changed on disk and then the Profiler topology must be restart
 
 | Setting                                                                       | Description
 |---                                                                            |---
-| [`profiler.input.topic`](#profilerinputtopic)                                 | The name of the Kafka topic from which to consume data.
-| [`profiler.output.topic`](#profileroutputtopic)                               | The name of the Kafka topic to which profile data is written.  Only used with profiles that define the [`triage` result field](#result).
+| [`profiler.input.topic`](#profilerinputtopic)                                 | The name of the input Kafka topic.
+| [`profiler.output.topic`](#profileroutputtopic)                               | The name of the output Kafka topic. 
 | [`profiler.period.duration`](#profilerperiodduration)                         | The duration of each profile period.  
-| [`profiler.period.duration.units`](#profilerperioddurationunits)              | The units used to specify the [`profiler.period.duration`](#profilerperiodduration).  
+| [`profiler.period.duration.units`](#profilerperioddurationunits)              | The units used to specify the [`profiler.period.duration`](#profilerperiodduration).
+| [`profiler.window.duration`](#profilerwindowduration)                         | The duration of each profile window.
+| [`profiler.window.duration.units`](#profilerpwindowdurationunits)             | The units used to specify the [`profiler.window.duration`](#profilerwindowduration).
+| [`profiler.window.lag`](#profilerwindowlag)                                   | The maximum time lag for timestamps.
+| [`profiler.window.lag.units`](#profilerpwindowlagunits)                       | The units used to specify the [`profiler.window.lag`](#profilerwindowlag).
 | [`profiler.workers`](#profilerworkers)                                        | The number of worker processes for the topology.
 | [`profiler.executors`](#profilerexecutors)                                    | The number of executors to spawn per component.
 | [`profiler.ttl`](#profilerttl)                                                | If a message has not been applied to a Profile in this period of time, the Profile will be forgotten and its resources will be cleaned up.
 | [`profiler.ttl.units`](#profilerttlunits)                                     | The units used to specify the `profiler.ttl`.
-| [`profiler.hbase.salt.divisor`](#profilerhbasesaltdivisor)                    | A salt is prepended to the row key to help prevent hotspotting.
+| [`profiler.hbase.salt.divisor`](#profilerhbasesaltdivisor)                    | A salt is prepended to the row key to help prevent hot-spotting.
 | [`profiler.hbase.table`](#profilerhbasetable)                                 | The name of the HBase table that profiles are written to.
 | [`profiler.hbase.column.family`](#profilerhbasecolumnfamily)                  | The column family used to store profiles.
 | [`profiler.hbase.batch`](#profilerhbasebatch)                                 | The number of puts that are written to HBase in a single batch.
@@ -508,6 +568,36 @@ The units used to specify the `profiler.period.duration`.  This value should be
 
 *Important*: To read a profile using the Profiler Client, the Profiler Client's `profiler.client.period.duration.units` property must match this value.  Otherwise, the [Profiler Client](metron-analytics/metron-profiler-client) will be unable to read the profile data.
 
+### `profiler.window.duration`
+
+*Default*: 30
+
+The duration of each profile window.  Telemetry that arrives within a slice of time is processed within a single window.  
+
+Many windows of telemetry will be processed during a single profile period.  This does not change the output of the Profiler, it only changes how the Profiler processes data. The window defines how much data the Profiler processes in a single pass.
+
+This value should be defined along with [`profiler.window.duration.units`](#profilerwindowdurationunits).
+
+This value must be less than the period duration as defined by [`profiler.period.duration`](#profilerperiodduration) and [`profiler.period.duration.units`](#profilerperioddurationunits).
+
+### `profiler.window.duration.units`
+
+*Default*: SECONDS
+
+The units used to specify the `profiler.window.duration`.  This value should be defined along with [`profiler.window.duration`](#profilerwindowduration).
+
+### `profiler.window.lag`
+
+*Default*: 1
+
+The maximum time lag for timestamps. Timestamps cannot arrive out-of-order by more than this amount. This value should be defined along with [`profiler.window.lag.units`](#profilerwindowlagunits).
+
+### `profiler.window.lag.units`
+
+*Default*: SECONDS
+
+The units used to specify the `profiler.window.lag`.  This value should be defined along with [`profiler.window.lag`](#profilerwindowlag).
+
 ### `profiler.workers`
 
 *Default*: 1

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/config/profiler.properties
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/config/profiler.properties b/metron-analytics/metron-profiler/src/main/config/profiler.properties
index 896f8d5..fe3c475 100644
--- a/metron-analytics/metron-profiler/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties
@@ -22,6 +22,10 @@
 
 topology.worker.childopts=
 topology.auto-credentials=
+profiler.workers=1
+profiler.executors=0
+topology.message.timeout.secs=30
+topology.max.spout.pending=100000
 
 ##### Profiler #####
 
@@ -29,10 +33,16 @@ profiler.input.topic=indexing
 profiler.output.topic=enrichments
 profiler.period.duration=15
 profiler.period.duration.units=MINUTES
-profiler.workers=1
-profiler.executors=0
+profiler.window.duration=30
+profiler.window.duration.units=SECONDS
 profiler.ttl=30
 profiler.ttl.units=MINUTES
+profiler.window.lag=1
+profiler.window.lag.units=MINUTES
+profiler.max.routes.per.bolt=10000
+
+##### HBase #####
+
 profiler.hbase.salt.divisor=1000
 profiler.hbase.table=profiler
 profiler.hbase.column.family=P

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 9ec5ba4..83c9fde 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -17,10 +17,12 @@
 name: "profiler"
 
 config:
-    topology.worker.childopts: ${topology.worker.childopts}
     topology.workers: ${profiler.workers}
     topology.acker.executors: ${profiler.executors}
+    topology.worker.childopts: ${topology.worker.childopts}
     topology.auto-credentials: ${topology.auto-credentials}
+    topology.message.timeout.secs: ${topology.message.timeout.secs}
+    topology.max.spout.pending: ${topology.max.spout.pending}
 
 components:
 
@@ -107,11 +109,23 @@ components:
             -   name: "withProducerConfigs"
                 args: [ref: "kafkaWriterProps"]
 
-    -   id: "kafkaDestinationHandler"
-        className: "org.apache.metron.profiler.bolt.KafkaDestinationHandler"
+    -   id: "kafkaEmitter"
+        className: "org.apache.metron.profiler.bolt.KafkaEmitter"
+
+    -   id: "hbaseEmitter"
+        className: "org.apache.metron.profiler.bolt.HBaseEmitter"
+
+    -   id: "windowDuration"
+        className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration"
+        constructorArgs:
+            - ${profiler.window.duration}
+            - "${profiler.window.duration.units}"
 
-    -   id: "hbaseDestinationHandler"
-        className: "org.apache.metron.profiler.bolt.HBaseDestinationHandler"
+    -   id: "windowLag"
+        className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration"
+        constructorArgs:
+            - ${profiler.window.lag}
+            - "${profiler.window.lag.units}"
 
 spouts:
 
@@ -129,17 +143,23 @@ bolts:
 
     -   id: "builderBolt"
         className: "org.apache.metron.profiler.bolt.ProfileBuilderBolt"
-        constructorArgs:
-            - "${kafka.zk}"
         configMethods:
+            - name: "withZookeeperUrl"
+              args: ["${kafka.zk}"]
             - name: "withPeriodDuration"
               args: [${profiler.period.duration}, "${profiler.period.duration.units}"]
             - name: "withProfileTimeToLive"
               args: [${profiler.ttl}, "${profiler.ttl.units}"]
-            - name: "withDestinationHandler"
-              args: [ref: "kafkaDestinationHandler"]
-            - name: "withDestinationHandler"
-              args: [ref: "hbaseDestinationHandler"]
+            - name: "withEmitter"
+              args: [ref: "kafkaEmitter"]
+            - name: "withEmitter"
+              args: [ref: "hbaseEmitter"]
+            - name: "withTumblingWindow"
+              args: [ref: "windowDuration"]
+            - name: "withLag"
+              args: [ref: "windowLag"]
+            - name: "withMaxNumberOfRoutes"
+              args: [${profiler.max.routes.per.bolt}]
 
     -   id: "hbaseBolt"
         className: "org.apache.metron.hbase.bolt.HBaseBolt"

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
deleted file mode 100644
index 2257784..0000000
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.metron.profiler.bolt;
-
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-
-/**
- * This class handles the mechanics of emitting a profile measurement to a
- * stream responsible for writing to a specific destination.
- *
- * The measurements produced by a profile can be written to one or more
- * destinations; HBase, Kafka, etc.  Each of the destinations leverage a
- * separate stream within the topology definition.
- */
-public interface DestinationHandler {
-
-  /**
-   * Each destination leverages a unique stream.  This method defines
-   * the unique stream identifier.
-   *
-   * The stream identifier must also be declared within the topology
-   * definition.
-   */
-  String getStreamId();
-
-  /**
-   * Declares the output fields for the stream.
-   * @param declarer
-   */
-  void declareOutputFields(OutputFieldsDeclarer declarer);
-
-  /**
-   * Emit the measurement.
-   * @param measurement The measurement to emit.
-   * @param collector The output collector.
-   */
-  void emit(ProfileMeasurement measurement, OutputCollector collector);
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
new file mode 100644
index 0000000..b9f57dd
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
@@ -0,0 +1,126 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+
+/**
+ * Signals a flush on a fixed frequency; every X milliseconds.
+ */
+public class FixedFrequencyFlushSignal implements FlushSignal {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The latest known timestamp.
+   */
+  private long currentTime;
+
+  /**
+   * The time when the next flush should occur.
+   */
+  private long flushTime;
+
+  /**
+   * The amount of time between flushes in milliseconds.
+   */
+  private long flushFrequency;
+
+  public FixedFrequencyFlushSignal(long flushFrequencyMillis) {
+
+    if(flushFrequencyMillis < 0) {
+      throw new IllegalArgumentException("flush frequency must be >= 0");
+    }
+
+    this.flushFrequency = flushFrequencyMillis;
+    reset();
+  }
+
+  /**
+   * Resets the state used to keep track of time.
+   */
+  @Override
+  public void reset() {
+    flushTime = 0;
+    currentTime = 0;
+
+    LOG.debug("Flush counters reset");
+  }
+
+  /**
+   * Update the internal state which tracks time.
+   *
+   * @param timestamp The timestamp received within a tuple.
+   */
+  @Override
+  public void update(long timestamp) {
+
+    if(timestamp > currentTime) {
+
+      // need to update current time
+      LOG.debug("Updating current time; last={}, new={}", currentTime, timestamp);
+      currentTime = timestamp;
+
+    } else if ((currentTime - timestamp) > flushFrequency) {
+
+      // significantly out-of-order timestamps
+      LOG.warn("Timestamps out-of-order by '{}' ms. This may indicate a problem in the data. last={}, current={}",
+              (currentTime - timestamp),
+              timestamp,
+              currentTime);
+    }
+
+    if(flushTime == 0) {
+
+      // set the next time to flush
+      flushTime = currentTime + flushFrequency;
+      LOG.debug("Setting flush time; flushTime={}, currentTime={}, flushFreq={}",
+              flushTime,
+              currentTime,
+              flushFrequency);
+    }
+  }
+
+  /**
+   * Returns true, if it is time to flush.
+   *
+   * @return True if time to flush.  Otherwise, false.
+   */
+  @Override
+  public boolean isTimeToFlush() {
+
+    boolean flush = currentTime > flushTime;
+    LOG.debug("Flush={}, '{}' ms until flush; currentTime={}, flushTime={}",
+            flush,
+            flush ? 0 : (flushTime-currentTime),
+            currentTime,
+            flushTime);
+
+    return flush;
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    return currentTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FlushSignal.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FlushSignal.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FlushSignal.java
new file mode 100644
index 0000000..0a9fc76
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FlushSignal.java
@@ -0,0 +1,51 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+/**
+ * Signals when it is time to flush a profile.
+ */
+public interface FlushSignal {
+
+  /**
+   * Returns true, if it is time to flush.
+   *
+   * @return True if time to flush.  Otherwise, false.
+   */
+  boolean isTimeToFlush();
+
+  /**
+   * Update the signaller with a known timestamp.
+   *
+   * @param timestamp A timestamp expected to be epoch milliseconds
+   */
+  void update(long timestamp);
+
+  /**
+   * Reset the signaller.
+   */
+  void reset();
+
+  /**
+   * Returns the current time in epoch milliseconds.
+   * @return The current time in epoch milliseconds.
+   */
+  long currentTimeMillis();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
deleted file mode 100644
index 4fa5dc1..0000000
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.metron.profiler.bolt;
-
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.io.Serializable;
-
-/**
- * Handles emitting a ProfileMeasurement to the stream which writes
- * profile measurements to HBase.
- */
-public class HBaseDestinationHandler implements DestinationHandler, Serializable {
-
-  /**
-   * The stream identifier used for this destination;
-   */
-  private String streamId = "hbase";
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declareStream(getStreamId(), new Fields("measurement"));
-  }
-
-  @Override
-  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
-    collector.emit(getStreamId(), new Values(measurement));
-  }
-
-  @Override
-  public String getStreamId() {
-    return streamId;
-  }
-
-  public void setStreamId(String streamId) {
-    this.streamId = streamId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
new file mode 100644
index 0000000..8e1229a
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
@@ -0,0 +1,63 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.bolt;
+
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+
+/**
+ * Responsible for emitting a {@link ProfileMeasurement} to an output stream that will
+ * persist data in HBase.
+ */
+public class HBaseEmitter implements ProfileMeasurementEmitter, Serializable {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The stream identifier used for this destination;
+   */
+  private  String streamId = "hbase";
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declareStream(getStreamId(), new Fields("measurement"));
+  }
+
+  @Override
+  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
+    collector.emit(getStreamId(), new Values(measurement));
+  }
+
+  @Override
+  public String getStreamId() {
+    return streamId;
+  }
+
+  public void setStreamId(String streamId) {
+    this.streamId = streamId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
deleted file mode 100644
index be82468..0000000
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.bolt;
-
-import java.io.Serializable;
-import java.lang.invoke.MethodHandles;
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles emitting a ProfileMeasurement to the stream which writes
- * profile measurements to Kafka.
- */
-public class KafkaDestinationHandler implements DestinationHandler, Serializable {
-
-  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  /**
-   * The stream identifier used for this destination;
-   */
-  private String streamId = "kafka";
-
-  /**
-   * The 'source.type' of messages originating from the Profiler.
-   */
-  private String sourceType = "profiler";
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    // the kafka writer expects a field named 'message'
-    declarer.declareStream(getStreamId(), new Fields("message"));
-  }
-
-  @Override
-  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
-
-    JSONObject message = new JSONObject();
-    message.put("profile", measurement.getDefinition().getProfile());
-    message.put("entity", measurement.getEntity());
-    message.put("period", measurement.getPeriod().getPeriod());
-    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
-    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
-    message.put("timestamp", System.currentTimeMillis());
-    message.put("source.type", sourceType);
-    message.put("is_alert", "true");
-
-    // append each of the triage values to the message
-    measurement.getTriageValues().forEach((key, value) -> {
-
-      if(isValidType(value)) {
-        message.put(key, value);
-
-      } else {
-        LOG.error(String.format("triage expression has invalid type. expect primitive types only. skipping: profile=%s, entity=%s, expression=%s, type=%s",
-                measurement.getDefinition().getProfile(), measurement.getEntity(), key, ClassUtils.getShortClassName(value, "null")));
-      }
-    });
-
-    collector.emit(getStreamId(), new Values(message));
-  }
-
-  /**
-   * The result of a profile's triage expressions must be a string or primitive type.
-   *
-   * This ensures that the value can be easily serialized and appended to a message destined for Kafka.
-   *
-   * @param value The value of a triage expression.
-   * @return True, if the type of the value is valid.
-   */
-  private boolean isValidType(Object value) {
-    return value != null && (value instanceof String || ClassUtils.isPrimitiveOrWrapper(value.getClass()));
-  }
-
-  @Override
-  public String getStreamId() {
-    return streamId;
-  }
-
-  public void setStreamId(String streamId) {
-    this.streamId = streamId;
-  }
-
-  public void setSourceType(String sourceType) {
-    this.sourceType = sourceType;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
new file mode 100644
index 0000000..29d1a49
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
@@ -0,0 +1,114 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for emitting a {@link ProfileMeasurement} to an output stream that will
+ * persist data in HBase.
+ */
+public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The stream identifier used for this destination;
+   */
+  private String streamId = "kafka";
+
+  /**
+   * The 'source.type' of messages originating from the Profiler.
+   */
+  private String sourceType = "profiler";
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    // the kafka writer expects a field named 'message'
+    declarer.declareStream(getStreamId(), new Fields("message"));
+  }
+
+  @Override
+  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
+
+    JSONObject message = new JSONObject();
+    message.put("profile", measurement.getDefinition().getProfile());
+    message.put("entity", measurement.getEntity());
+    message.put("period", measurement.getPeriod().getPeriod());
+    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
+    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
+    message.put("timestamp", System.currentTimeMillis());
+    message.put("source.type", sourceType);
+    message.put("is_alert", "true");
+
+    // append each of the triage values to the message
+    measurement.getTriageValues().forEach((key, value) -> {
+
+      if(isValidType(value)) {
+        message.put(key, value);
+
+      } else {
+        LOG.error(String.format(
+                "triage expression must result in primitive type, skipping; type=%s, profile=%s, entity=%s, expr=%s",
+                ClassUtils.getShortClassName(value, "null"),
+                measurement.getDefinition().getProfile(),
+                measurement.getEntity(),
+                key));
+      }
+    });
+
+    collector.emit(getStreamId(), new Values(message));
+  }
+
+  /**
+   * The result of a profile's triage expressions must be a string or primitive type.
+   *
+   * This ensures that the value can be easily serialized and appended to a message destined for Kafka.
+   *
+   * @param value The value of a triage expression.
+   * @return True, if the type of the value is valid.
+   */
+  private boolean isValidType(Object value) {
+    return value != null && (value instanceof String || ClassUtils.isPrimitiveOrWrapper(value.getClass()));
+  }
+
+  @Override
+  public String getStreamId() {
+    return streamId;
+  }
+
+  public void setStreamId(String streamId) {
+    this.streamId = streamId;
+  }
+
+  public void setSourceType(String sourceType) {
+    this.sourceType = sourceType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ManualFlushSignal.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ManualFlushSignal.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ManualFlushSignal.java
new file mode 100644
index 0000000..d8e9539
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ManualFlushSignal.java
@@ -0,0 +1,54 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.bolt;
+
+/**
+ * Signals that a flush should occur.
+ *
+ * <p>The flush signal can be turned on or off like a switch as needed.  Most useful for testing.
+ */
+public class ManualFlushSignal implements FlushSignal {
+
+  private boolean flushNow = false;
+
+  public void setFlushNow(boolean flushNow) {
+    this.flushNow = flushNow;
+  }
+
+  @Override
+  public boolean isTimeToFlush() {
+    return flushNow;
+  }
+
+  @Override
+  public void update(long timestamp) {
+    // nothing to do
+  }
+
+  @Override
+  public void reset() {
+    // nothing to do.
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    // not needed
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 3c8d875..ffe823f 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -20,19 +20,36 @@
 
 package org.apache.metron.profiler.bolt;
 
-import org.apache.metron.common.bolt.ConfiguredProfilerBolt;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater;
+import org.apache.metron.common.zookeeper.configurations.Reloadable;
 import org.apache.metron.profiler.DefaultMessageDistributor;
+import org.apache.metron.profiler.MessageDistributor;
 import org.apache.metron.profiler.MessageRoute;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.zookeeper.SimpleEventListener;
+import org.apache.metron.zookeeper.ZKCache;
 import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.windowing.TupleWindow;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.slf4j.Logger;
@@ -42,42 +59,76 @@ import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static java.lang.String.format;
+import static org.apache.metron.profiler.bolt.ProfileSplitterBolt.ENTITY_TUPLE_FIELD;
+import static org.apache.metron.profiler.bolt.ProfileSplitterBolt.MESSAGE_TUPLE_FIELD;
+import static org.apache.metron.profiler.bolt.ProfileSplitterBolt.PROFILE_TUPLE_FIELD;
+import static org.apache.metron.profiler.bolt.ProfileSplitterBolt.TIMESTAMP_TUPLE_FIELD;
 
 /**
- * A bolt that is responsible for building a Profile.
- *
- * This bolt maintains the state required to build a Profile.  When the window
- * period expires, the data is summarized as a ProfileMeasurement, all state is
- * flushed, and the ProfileMeasurement is emitted.
+ * A Storm bolt that is responsible for building a profile.
  *
+ * <p>This bolt maintains the state required to build a Profile.  When the window
+ * period expires, the data is summarized as a {@link ProfileMeasurement}, all state is
+ * flushed, and the {@link ProfileMeasurement} is emitted.
  */
-public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
+public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private OutputCollector collector;
 
   /**
+   * The URL to connect to Zookeeper.
+   */
+  private String zookeeperUrl;
+
+  /**
+   * The Zookeeper client connection.
+   */
+  protected CuratorFramework zookeeperClient;
+
+  /**
+   * The Zookeeper cache.
+   */
+  protected ZKCache zookeeperCache;
+
+  /**
+   * Manages configuration for the Profiler.
+   */
+  private ProfilerConfigurations configurations;
+
+  /**
    * The duration of each profile period in milliseconds.
    */
   private long periodDurationMillis;
 
   /**
+   * The duration of Storm's event window.
+   */
+  private long windowDurationMillis;
+
+  /**
    * If a message has not been applied to a Profile in this number of milliseconds,
    * the Profile will be forgotten and its resources will be cleaned up.
    *
-   * WARNING: The TTL must be at least greater than the period duration.
+   * <p>WARNING: The TTL must be at least greater than the period duration.
    */
   private long profileTimeToLiveMillis;
 
   /**
+   * The maximum number of {@link MessageRoute} routes that will be maintained by
+   * this bolt.  After this value is exceeded, lesser used routes will be evicted
+   * from the internal cache.
+   */
+  private long maxNumberOfRoutes;
+
+  /**
    * Distributes messages to the profile builders.
    */
-  private DefaultMessageDistributor messageDistributor;
+  private MessageDistributor messageDistributor;
 
   /**
    * Parses JSON messages.
@@ -85,112 +136,245 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   private transient JSONParser parser;
 
   /**
-   * The measurements produced by a profile can be written to multiple destinations.  Each
-   * destination is handled by a separate `DestinationHandler`.
+   * Responsible for emitting {@link ProfileMeasurement} values.
+   *
+   * <p>The {@link ProfileMeasurement} values generated by a profile can be written to
+   * multiple endpoints like HBase or Kafka.  Each endpoint is handled by a separate
+   * {@link ProfileMeasurementEmitter}.
    */
-  private List<DestinationHandler> destinationHandlers;
+  private List<ProfileMeasurementEmitter> emitters;
 
   /**
-   * @param zookeeperUrl The Zookeeper URL that contains the configuration data.
+   * Signals when it is time to flush.
    */
-  public ProfileBuilderBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
-    this.destinationHandlers = new ArrayList<>();
-  }
+  private FlushSignal flushSignal;
 
-  /**
-   * Defines the frequency at which the bolt will receive tick tuples.  Tick tuples are
-   * used to control how often a profile is flushed.
-   */
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    // how frequently should the bolt receive tick tuples?
-    Config conf = new Config();
-    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, TimeUnit.MILLISECONDS.toSeconds(periodDurationMillis));
-    return conf;
+  public ProfileBuilderBolt() {
+    this.emitters = new ArrayList<>();
   }
 
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     super.prepare(stormConf, context, collector);
 
+    if(periodDurationMillis <= 0) {
+      throw new IllegalArgumentException("expect 'profiler.period.duration' >= 0");
+    }
+    if(profileTimeToLiveMillis <= 0) {
+      throw new IllegalArgumentException("expect 'profiler.ttl' >= 0");
+    }
     if(profileTimeToLiveMillis < periodDurationMillis) {
-      throw new IllegalStateException(format(
-              "invalid configuration: expect profile TTL (%d) to be greater than period duration (%d)",
-              profileTimeToLiveMillis,
-              periodDurationMillis));
+      throw new IllegalArgumentException("expect 'profiler.ttl' >= 'profiler.period.duration'");
     }
+    if(maxNumberOfRoutes <= 0) {
+      throw new IllegalArgumentException("expect 'profiler.max.routes.per.bolt' > 0");
+    }
+    if(windowDurationMillis <= 0) {
+      throw new IllegalArgumentException("expect 'profiler.window.duration' > 0");
+    }
+    if(windowDurationMillis > periodDurationMillis) {
+      throw new IllegalArgumentException("expect 'profiler.period.duration' >= 'profiler.window.duration'");
+    }
+    if(periodDurationMillis % windowDurationMillis != 0) {
+      throw new IllegalArgumentException("expect 'profiler.period.duration' % 'profiler.window.duration' == 0");
+    }
+
     this.collector = collector;
     this.parser = new JSONParser();
-    this.messageDistributor = new DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis);
+    this.messageDistributor = new DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes);
+    this.configurations = new ProfilerConfigurations();
+    this.flushSignal = new FixedFrequencyFlushSignal(periodDurationMillis);
+    setupZookeeper();
+  }
+
+  @Override
+  public void cleanup() {
+    zookeeperCache.close();
+    zookeeperClient.close();
+  }
+
+  private void setupZookeeper() {
+    try {
+      if (zookeeperClient == null) {
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zookeeperClient = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+      }
+      zookeeperClient.start();
+
+      // this is temporary to ensure that any validation passes. the individual bolt
+      // will reinitialize stellar to dynamically pull from zookeeper.
+      ConfigurationsUtils.setupStellarStatically(zookeeperClient);
+      if (zookeeperCache == null) {
+        ConfigurationsUpdater<ProfilerConfigurations> updater = createUpdater();
+        SimpleEventListener listener = new SimpleEventListener.Builder()
+                .with( updater::update, TreeCacheEvent.Type.NODE_ADDED, TreeCacheEvent.Type.NODE_UPDATED)
+                .with( updater::delete, TreeCacheEvent.Type.NODE_REMOVED)
+                .build();
+        zookeeperCache = new ZKCache.Builder()
+                .withClient(zookeeperClient)
+                .withListener(listener)
+                .withRoot(Constants.ZOOKEEPER_TOPOLOGY_ROOT)
+                .build();
+        updater.forceUpdate(zookeeperClient);
+        zookeeperCache.start();
+      }
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected ConfigurationsUpdater<ProfilerConfigurations> createUpdater() {
+    return new ProfilerUpdater(this, this::getConfigurations);
+  }
+
+  public ProfilerConfigurations getConfigurations() {
+    return configurations;
+  }
+
+  @Override
+  public void reloadCallback(String name, ConfigurationType type) {
+    // nothing to do
   }
 
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    if(destinationHandlers.size() == 0) {
+
+    if(emitters.size() == 0) {
       throw new IllegalStateException("At least one destination handler must be defined.");
     }
 
-    // each destination will define its own stream
-    destinationHandlers.forEach(dest -> dest.declareOutputFields(declarer));
+    // allow each emitter to define its own stream
+    emitters.forEach(emitter -> emitter.declareOutputFields(declarer));
+  }
+
+  /**
+   * Defines the frequency at which the bolt will receive tick tuples.  Tick tuples are
+   * used to control how often a profile is flushed.
+   */
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+
+    Map<String, Object> conf = super.getComponentConfiguration();
+    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, TimeUnit.MILLISECONDS.toSeconds(profileTimeToLiveMillis));
+    return conf;
   }
 
   private Context getStellarContext() {
+
     Map<String, Object> global = getConfigurations().getGlobalConfig();
     return new Context.Builder()
-            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> zookeeperClient)
             .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
             .with(Context.Capabilities.STELLAR_CONFIG, () -> global)
             .build();
   }
 
-  /**
-   * Expect to receive either a tick tuple or a telemetry message that needs applied
-   * to a profile.
-   * @param input The tuple.
-   */
   @Override
-  public void execute(Tuple input) {
+  public void execute(TupleWindow window) {
+
+    LOG.debug("Tuple window contains {} tuple(s), {} expired, {} new",
+            CollectionUtils.size(window.get()),
+            CollectionUtils.size(window.getExpired()),
+            CollectionUtils.size(window.getNew()));
+
     try {
-      if(TupleUtils.isTick(input)) {
-        handleTick();
 
-      } else {
-        handleMessage(input);
+      // handle each tuple in the window
+      for(Tuple tuple : window.get()) {
+
+        if(TupleUtils.isTick(tuple)) {
+          handleTick();
+
+        } else {
+          handleMessage(tuple);
+        }
+      }
+
+      // time to flush?
+      if(flushSignal.isTimeToFlush()) {
+        flushSignal.reset();
+
+        // flush the active profiles
+        List<ProfileMeasurement> measurements = messageDistributor.flush();
+        emitMeasurements(measurements);
+
+        LOG.debug("Flushed active profiles and found {} measurement(s).", measurements.size());
       }
 
     } catch (Throwable e) {
-      LOG.error(format("Unexpected failure: message='%s', tuple='%s'", e.getMessage(), input), e);
-      collector.reportError(e);
 
-    } finally {
-      collector.ack(input);
+      LOG.error("Unexpected error", e);
+      collector.reportError(e);
     }
   }
 
   /**
-   * Handles a telemetry message
-   * @param input The tuple.
+   * Flush all expired profiles when a 'tick' is received.
+   *
+   * If a profile has not received a message for an extended period of time then it is
+   * marked as expired.  Periodically we need to flush these expired profiles to ensure
+   * that their state is not lost.
    */
-  private void handleMessage(Tuple input) throws ExecutionException {
-    JSONObject message = getField("message", input, JSONObject.class);
-    ProfileConfig definition = getField("profile", input, ProfileConfig.class);
-    String entity = getField("entity", input, String.class);
-    MessageRoute route = new MessageRoute(definition, entity);
+  private void handleTick() {
+
+    // flush the expired profiles
+    List<ProfileMeasurement> measurements = messageDistributor.flushExpired();
+    emitMeasurements(measurements);
 
-    messageDistributor.distribute(message, route, getStellarContext());
+    LOG.debug("Flushed expired profiles and found {} measurement(s).", measurements.size());
   }
 
   /**
-   * Handles a tick tuple.
+   * Handles the processing of a single tuple.
+   *
+   * @param input The tuple containing a telemetry message.
    */
-  private void handleTick() {
-    List<ProfileMeasurement> measurements = messageDistributor.flush();
+  private void handleMessage(Tuple input) {
+
+    // crack open the tuple
+    JSONObject message = getField(MESSAGE_TUPLE_FIELD, input, JSONObject.class);
+    ProfileConfig definition = getField(PROFILE_TUPLE_FIELD, input, ProfileConfig.class);
+    String entity = getField(ENTITY_TUPLE_FIELD, input, String.class);
+    Long timestamp = getField(TIMESTAMP_TUPLE_FIELD, input, Long.class);
+
+    // keep track of time
+    flushSignal.update(timestamp);
+    
+    // distribute the message
+    MessageRoute route = new MessageRoute(definition, entity);
+    messageDistributor.distribute(message, timestamp, route, getStellarContext());
 
-    // forward the measurements to each destination handler
-    for(ProfileMeasurement m : measurements ) {
-      destinationHandlers.forEach(handler -> handler.emit(m, collector));
+    LOG.debug("Message distributed: profile={}, entity={}, timestamp={}", definition.getProfile(), entity, timestamp);
+  }
+
+  /**
+   * Handles the {@code ProfileMeasurement}s that are created when a profile is flushed.
+   *
+   * @param measurements The measurements to handle.
+   */
+  private void emitMeasurements(List<ProfileMeasurement> measurements) {
+
+    // flush each profile
+    for(ProfileMeasurement measurement: measurements) {
+
+      // allow each 'emitter' to emit the measurement
+      for (ProfileMeasurementEmitter emitter : emitters) {
+        emitter.emit(measurement, collector);
+
+        LOG.debug("Measurement emitted; stream={}, profile={}, entity={}, value={}, start={}, end={}, duration={}, period={}",
+                emitter.getStreamId(),
+                measurement.getProfileName(),
+                measurement.getEntity(),
+                measurement.getProfileValue(),
+                measurement.getPeriod().getStartTimeMillis(),
+                measurement.getPeriod().getEndTimeMillis(),
+                measurement.getPeriod().getDurationMillis(),
+                measurement.getPeriod().getPeriod());
+      }
     }
+
+    LOG.debug("Emitted {} measurement(s).", measurements.size());
   }
 
   /**
@@ -202,14 +386,27 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
    * @param <T> The type of the field value.
    */
   private <T> T getField(String fieldName, Tuple tuple, Class<T> clazz) {
+
     T value = ConversionUtils.convert(tuple.getValueByField(fieldName), clazz);
     if(value == null) {
-      throw new IllegalStateException(format("invalid tuple received: missing or invalid field '%s'", fieldName));
+      throw new IllegalStateException(format("Invalid tuple: missing or invalid field '%s'", fieldName));
     }
 
     return value;
   }
 
+  @Override
+  public BaseWindowedBolt withTumblingWindow(BaseWindowedBolt.Duration duration) {
+
+    // need to capture the window duration for setting the flush count down
+    this.windowDurationMillis = duration.value;
+    return super.withTumblingWindow(duration);
+  }
+
+  public long getPeriodDurationMillis() {
+    return periodDurationMillis;
+  }
+
   public ProfileBuilderBolt withPeriodDurationMillis(long periodDurationMillis) {
     this.periodDurationMillis = periodDurationMillis;
     return this;
@@ -224,16 +421,55 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     return this;
   }
 
+  public long getWindowDurationMillis() {
+    return windowDurationMillis;
+  }
+
   public ProfileBuilderBolt withProfileTimeToLive(int duration, TimeUnit units) {
     return withProfileTimeToLiveMillis(units.toMillis(duration));
   }
 
-  public ProfileBuilderBolt withDestinationHandler(DestinationHandler handler) {
-    this.destinationHandlers.add(handler);
+  public ProfileBuilderBolt withEmitter(ProfileMeasurementEmitter emitter) {
+    this.emitters.add(emitter);
     return this;
   }
 
-  public DefaultMessageDistributor getMessageDistributor() {
+  public MessageDistributor getMessageDistributor() {
     return messageDistributor;
   }
+
+  public ProfileBuilderBolt withZookeeperUrl(String zookeeperUrl) {
+    this.zookeeperUrl = zookeeperUrl;
+    return this;
+  }
+
+  public ProfileBuilderBolt withZookeeperClient(CuratorFramework zookeeperClient) {
+    this.zookeeperClient = zookeeperClient;
+    return this;
+  }
+
+  public ProfileBuilderBolt withZookeeperCache(ZKCache zookeeperCache) {
+    this.zookeeperCache = zookeeperCache;
+    return this;
+  }
+
+  public ProfileBuilderBolt withProfilerConfigurations(ProfilerConfigurations configurations) {
+    this.configurations = configurations;
+    return this;
+  }
+
+  public ProfileBuilderBolt withMaxNumberOfRoutes(long maxNumberOfRoutes) {
+    this.maxNumberOfRoutes = maxNumberOfRoutes;
+    return this;
+  }
+
+  public ProfileBuilderBolt withFlushSignal(FlushSignal flushSignal) {
+    this.flushSignal = flushSignal;
+    return this;
+  }
+
+  public ProfileBuilderBolt withMessageDistributor(MessageDistributor messageDistributor) {
+    this.messageDistributor = messageDistributor;
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileMeasurementEmitter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileMeasurementEmitter.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileMeasurementEmitter.java
new file mode 100644
index 0000000..e1fe4e1
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileMeasurementEmitter.java
@@ -0,0 +1,59 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.bolt;
+
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+
+/**
+ * Handles the mechanics of emitting a {@link ProfileMeasurement} to an output
+ * stream.
+ *
+ * <p>The Profiler allows the measurements produced by a profile to be written to
+ * multiple endpoints such as HBase and Kafka.  Each of these endpoints will have
+ * a unique stream that the measurements are written to.
+ *
+ * <p>Implementors of this interface are responsible for defining and managing the
+ * output stream for a specific endpoint.
+ */
+public interface ProfileMeasurementEmitter {
+
+  /**
+   * Each destination leverages a unique stream.  This method defines
+   * the unique stream identifier.
+   *
+   * The stream identifier must also be declared within the topology
+   * definition.
+   */
+  String getStreamId();
+
+  /**
+   * Declares the output fields for the stream.
+   * @param declarer
+   */
+  void declareOutputFields(OutputFieldsDeclarer declarer);
+
+  /**
+   * Emit the measurement.
+   * @param measurement The measurement to emit.
+   * @param collector The output collector.
+   */
+  void emit(ProfileMeasurement measurement, OutputCollector collector);
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index a453c66..4e62eee 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -21,10 +21,14 @@
 package org.apache.metron.profiler.bolt;
 
 import org.apache.metron.common.bolt.ConfiguredProfilerBolt;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
-import org.apache.metron.profiler.MessageRouter;
-import org.apache.metron.profiler.MessageRoute;
 import org.apache.metron.profiler.DefaultMessageRouter;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.MessageRouter;
+import org.apache.metron.profiler.clock.Clock;
+import org.apache.metron.profiler.clock.ClockFactory;
+import org.apache.metron.profiler.clock.DefaultClockFactory;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -42,16 +46,45 @@ import java.io.UnsupportedEncodingException;
 import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
- * The bolt responsible for filtering incoming messages and directing
- * each to the one or more bolts responsible for building a Profile.  Each
- * message may be needed by 0, 1 or even many Profiles.
+ * The Storm bolt responsible for filtering incoming messages and directing
+ * each to the downstream bolts responsible for building a Profile.
  */
 public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  /**
+   * The name of the tuple field containing the entity.
+   *
+   * This is the result of executing a profile's 'entity' Stellar expression within
+   * the context of the telemetry message.
+   */
+  protected static final String ENTITY_TUPLE_FIELD = "entity";
+
+  /**
+   * The name of the tuple field containing the profile definition.
+   */
+  protected static final String PROFILE_TUPLE_FIELD = "profile";
+
+  /**
+   * The name of the tuple field containing the telemetry message.
+   */
+  protected static final String MESSAGE_TUPLE_FIELD = "message";
+
+  /**
+   * The name of the tuple field containing the timestamp of the telemetry message.
+   *
+   * <p>If a 'timestampField' has been configured, the timestamp was extracted
+   * from a field within the telemetry message.  This enables event time processing.
+   *
+   * <p>If a 'timestampField' has not been configured, then the Profiler uses
+   * processing time and the timestamp originated from the system clock.
+   */
+  protected static final String TIMESTAMP_TUPLE_FIELD = "timestamp";
+
   private OutputCollector collector;
 
   /**
@@ -62,7 +95,12 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
   /**
    * The router responsible for routing incoming messages.
    */
-  private MessageRouter router;
+  private transient MessageRouter router;
+
+  /**
+   * Responsible for creating the {@link Clock}.
+   */
+  private transient ClockFactory clockFactory;
 
   /**
    * @param zookeeperUrl The Zookeeper URL that contains the configuration for this bolt.
@@ -77,6 +115,7 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
     this.collector = collector;
     this.parser = new JSONParser();
     this.router = new DefaultMessageRouter(getStellarContext());
+    this.clockFactory = new DefaultClockFactory();
   }
 
   private Context getStellarContext() {
@@ -88,13 +127,26 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
             .build();
   }
 
+  /**
+   * This bolt consumes telemetry messages and determines if the message is needed
+   * by any of the profiles.  The message is then routed to one or more downstream
+   * bolts that are responsible for building each profile
+   *
+   * <p>The outgoing tuples are timestamped so that Storm's window and event-time
+   * processing functionality can recognize the time of each message.
+   *
+   * <p>The timestamp that is attached to each outgoing tuple is what decides if
+   * the Profiler is operating on processing time or event time.
+   *
+   * @param input The tuple.
+   */
   @Override
   public void execute(Tuple input) {
     try {
       doExecute(input);
 
     } catch (IllegalArgumentException | ParseException | UnsupportedEncodingException e) {
-      LOG.error("Unexpected failure: message='{}', tuple='{}'", e.getMessage(), input, e);
+      LOG.error("Unexpected error", e);
       collector.reportError(e);
 
     } finally {
@@ -103,41 +155,85 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
   }
 
   private void doExecute(Tuple input) throws ParseException, UnsupportedEncodingException {
+
     // retrieve the input message
     byte[] data = input.getBinary(0);
     JSONObject message = (JSONObject) parser.parse(new String(data, "UTF8"));
 
     // ensure there is a valid profiler configuration
     ProfilerConfig config = getProfilerConfig();
-    if(config != null) {
+    if(config != null && config.getProfiles().size() > 0) {
+
+      // what time is it?
+      Clock clock = clockFactory.createClock(config);
+      Optional<Long> timestamp = clock.currentTimeMillis(message);
 
-      // emit a message for each 'route'
-      List<MessageRoute> routes = router.route(message, config, getStellarContext());
-      for(MessageRoute route : routes) {
-        collector.emit(input, new Values(route.getEntity(), route.getProfileDefinition(), message));
-      }
+      // route the message.  if a message does not contain the timestamp field, it cannot be routed.
+      timestamp.ifPresent(ts -> routeMessage(input, message, config, ts));
 
     } else {
-      LOG.warn("No Profiler configuration found.  Nothing to do.");
+      LOG.debug("No Profiler configuration found.  Nothing to do.");
     }
   }
 
   /**
+   * Route a message based on the Profiler configuration.
+   * @param input The input tuple on which to anchor.
+   * @param message The telemetry message.
+   * @param config The Profiler configuration.
+   * @param timestamp The timestamp of the telemetry message.
+   */
+  private void routeMessage(Tuple input, JSONObject message, ProfilerConfig config, Long timestamp) {
+
+    // emit a tuple for each 'route'
+    List<MessageRoute> routes = router.route(message, config, getStellarContext());
+    for (MessageRoute route : routes) {
+
+      Values values = createValues(message, timestamp, route);
+      collector.emit(input, values);
+    }
+
+    LOG.debug("Found {} route(s) for message with timestamp={}", routes.size(), timestamp);
+  }
+
+  /**
    * Each emitted tuple contains the following fields.
    * <p>
    * <ol>
-   * <li> entity - The name of the entity.  The actual result of executing the Stellar expression.
-   * <li> profile - The profile definition that the message needs applied to.
-   * <li> message - The message containing JSON-formatted data that needs applied to a profile.
+   * <li>message - The message containing JSON-formatted data that needs applied to a profile.
+   * <li>timestamp - The timestamp of the message.
+   * <li>entity - The name of the entity.  The actual result of executing the Stellar expression.
+   * <li>profile - The profile definition that the message needs applied to.
    * </ol>
    * <p>
    */
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declare(new Fields("entity", "profile", "message"));
+
+    // the order here must match 'createValues'
+    Fields fields = new Fields(MESSAGE_TUPLE_FIELD, TIMESTAMP_TUPLE_FIELD, ENTITY_TUPLE_FIELD, PROFILE_TUPLE_FIELD);
+    declarer.declare(fields);
+  }
+
+  /**
+   * Creates the {@link Values} attached to the outgoing tuple.
+   *
+   * @param message The telemetry message.
+   * @param timestamp The timestamp of the message.
+   * @param route The route the message must take.
+   * @return
+   */
+  private Values createValues(JSONObject message, Long timestamp, MessageRoute route) {
+
+    // the order here must match `declareOutputFields`
+    return new Values(message, timestamp, route.getEntity(), route.getProfileDefinition());
   }
 
   protected MessageRouter getMessageRouter() {
     return router;
   }
+
+  public void setClockFactory(ClockFactory clockFactory) {
+    this.clockFactory = clockFactory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
new file mode 100644
index 0000000..9d727a3
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
@@ -0,0 +1,12 @@
+{
+  "profiles": [
+    {
+      "profile": "event-time-test",
+      "foreach": "ip_src_addr",
+      "init":   { "counter": "0" },
+      "update": { "counter": "counter + 1" },
+      "result": "counter"
+    }
+  ],
+  "timestampField": "timestamp"
+}
\ No newline at end of file


Mime
View raw message