metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From o...@apache.org
Subject [34/52] [abbrv] metron git commit: METRON-1505 Intermittent Profiler Integration Test Failure (nickwallen) closes apache/metron#977
Date Wed, 18 Apr 2018 15:00:04 GMT
METRON-1505 Intermittent Profiler Integration Test Failure (nickwallen) closes apache/metron#977


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/46bc63db
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/46bc63db
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/46bc63db

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 46bc63dbcfe9f0ddabfd4821958962a2dac9378e
Parents: ab4f8e6
Author: nickwallen <nick@nickallen.org>
Authored: Sat Apr 7 11:28:01 2018 -0400
Committer: nickallen <nickallen@apache.org>
Committed: Sat Apr 7 11:28:01 2018 -0400

----------------------------------------------------------------------
 .../profiler/DefaultMessageDistributor.java     |  54 +++-
 .../src/main/flux/profiler/remote.yaml          |   2 +
 .../profiler/bolt/ProfileBuilderBolt.java       | 149 +++++++---
 .../profiler/bolt/ProfileSplitterBolt.java      |   1 -
 .../config/zookeeper/percentiles/profiler.json  |  12 -
 .../processing-time-test/profiler.json          |  11 +
 .../zookeeper/readme-example-1/profiler.json    |  17 --
 .../zookeeper/readme-example-2/profiler.json    |  18 --
 .../zookeeper/readme-example-3/profiler.json    |  11 -
 .../zookeeper/readme-example-4/profiler.json    |  11 -
 .../profiler/bolt/ProfileBuilderBoltTest.java   | 130 +++------
 .../integration/ProfilerIntegrationTest.java    | 274 +++++--------------
 .../configuration/profiler/ProfileConfig.java   |  49 ++--
 .../ZKConfigurationsCacheIntegrationTest.java   |   4 +-
 .../org/apache/metron/hbase/bolt/HBaseBolt.java |  22 +-
 15 files changed, 319 insertions(+), 446 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index ea5126f..70f4228 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -25,6 +25,7 @@ import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.stellar.dsl.Context;
 import org.json.simple.JSONObject;
@@ -33,7 +34,6 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -74,7 +74,7 @@ public class DefaultMessageDistributor implements MessageDistributor {
    * messages.  Once it has not received messages for a period of time, it is
    * moved to the expired cache.
    */
-  private transient Cache<String, ProfileBuilder> activeCache;
+  private transient Cache<Integer, ProfileBuilder> activeCache;
 
   /**
    * A cache of expired profiles.
@@ -85,7 +85,7 @@ public class DefaultMessageDistributor implements MessageDistributor {
    * can flush the state of the expired profile.  If the client does not flush
    * the expired profiles, this state will be lost forever.
    */
-  private transient Cache<String, ProfileBuilder> expiredCache;
+  private transient Cache<Integer, ProfileBuilder> expiredCache;
 
   /**
    * Create a new message distributor.
@@ -222,7 +222,7 @@ public class DefaultMessageDistributor implements MessageDistributor {
    * @param cache The cache to flush.
    * @return The measurements captured when flushing the profiles.
    */
-  private List<ProfileMeasurement> flushCache(Cache<String, ProfileBuilder> cache) {
+  private List<ProfileMeasurement> flushCache(Cache<Integer, ProfileBuilder> cache) {
 
     List<ProfileMeasurement> measurements = new ArrayList<>();
     for(ProfileBuilder profileBuilder: cache.asMap().values()) {
@@ -262,11 +262,19 @@ public class DefaultMessageDistributor implements MessageDistributor {
   /**
    * Builds the key that is used to lookup the {@link ProfileBuilder} within the cache.
    *
+   * <p>The cache key is built using the hash codes of the profile and entity name.  If the profile
+   * definition is ever changed, the same cache entry will not be reused.  This ensures that no
+   * state can be carried over from the old definition into the new, which might result in an
+   * invalid profile measurement.
+   *
    * @param profile The profile definition.
    * @param entity The entity.
    */
-  private String cacheKey(ProfileConfig profile, String entity) {
-    return format("%s:%s", profile, entity);
+  private int cacheKey(ProfileConfig profile, String entity) {
+    return new HashCodeBuilder(17, 37)
+            .append(profile)
+            .append(entity)
+            .hashCode();
   }
 
   public DefaultMessageDistributor withPeriodDurationMillis(long periodDurationMillis) {
@@ -281,29 +289,45 @@ public class DefaultMessageDistributor implements MessageDistributor {
   /**
    * A listener that is notified when profiles expire from the active cache.
    */
-  private class ActiveCacheRemovalListener implements RemovalListener<String, ProfileBuilder> {
+  private class ActiveCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder> {
 
     @Override
-    public void onRemoval(RemovalNotification<String, ProfileBuilder> notification) {
+    public void onRemoval(RemovalNotification<Integer, ProfileBuilder> notification) {
 
-      String key = notification.getKey();
       ProfileBuilder expired = notification.getValue();
+      LOG.warn("Profile expired from active cache; profile={}, entity={}",
+              expired.getDefinition().getProfile(),
+              expired.getEntity());
 
-      LOG.warn("Profile expired from active cache; key={}", key);
-      expiredCache.put(key, expired);
+      // add the profile to the expired cache
+      expiredCache.put(notification.getKey(), expired);
     }
   }
 
   /**
    * A listener that is notified when profiles expire from the active cache.
    */
-  private class ExpiredCacheRemovalListener implements RemovalListener<String, ProfileBuilder> {
+  private class ExpiredCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder> {
 
     @Override
-    public void onRemoval(RemovalNotification<String, ProfileBuilder> notification) {
+    public void onRemoval(RemovalNotification<Integer, ProfileBuilder> notification) {
+
+      if(notification.wasEvicted()) {
+
+        // the expired profile was NOT flushed in time
+        ProfileBuilder expired = notification.getValue();
+        LOG.warn("Expired profile NOT flushed before removal, some state lost; profile={}, entity={}",
+                expired.getDefinition().getProfile(),
+                expired.getEntity());
 
-      String key = notification.getKey();
-      LOG.debug("Profile removed from expired cache; key={}", key);
+      } else {
+
+        // the expired profile was flushed successfully
+        ProfileBuilder expired = notification.getValue();
+        LOG.debug("Expired profile successfully flushed; profile={}, entity={}",
+                expired.getDefinition().getProfile(),
+                expired.getEntity());
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 83c9fde..6ad007b 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -160,6 +160,8 @@ bolts:
               args: [ref: "windowLag"]
             - name: "withMaxNumberOfRoutes"
               args: [${profiler.max.routes.per.bolt}]
+            - name: "withTimestampField"
+              args: ["timestamp"]
 
     -   id: "hbaseBolt"
         className: "org.apache.metron.hbase.bolt.HBaseBolt"

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index ffe823f..fb3d2d0 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -42,13 +42,13 @@ import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.zookeeper.SimpleEventListener;
 import org.apache.metron.zookeeper.ZKCache;
-import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.utils.Utils;
 import org.apache.storm.windowing.TupleWindow;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -60,6 +60,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static java.lang.String.format;
 import static org.apache.metron.profiler.bolt.ProfileSplitterBolt.ENTITY_TUPLE_FIELD;
@@ -127,6 +129,9 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
 
   /**
    * Distributes messages to the profile builders.
+   *
+   * <p>Since expired profiles are flushed on a separate thread, all access to this
+   * {@code MessageDistributor} needs to be protected.
    */
   private MessageDistributor messageDistributor;
 
@@ -145,9 +150,21 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
   private List<ProfileMeasurementEmitter> emitters;
 
   /**
-   * Signals when it is time to flush.
+   * Signals when it is time to flush the active profiles.
+   */
+  private FlushSignal activeFlushSignal;
+
+  /**
+   * A timer that flushes expired profiles on a regular interval. The expired profiles
+   * are flushed on a separate thread.
+   *
+   * <p>Flushing expired profiles ensures that any profiles that stop receiving messages
+   * for an extended period of time will continue to be flushed.
+   *
+   * <p>This introduces concurrency issues as the bolt is no longer single threaded. Due
+   * to this, all access to the {@code MessageDistributor} needs to be protected.
    */
-  private FlushSignal flushSignal;
+  private StormTimer expiredFlushTimer;
 
   public ProfileBuilderBolt() {
     this.emitters = new ArrayList<>();
@@ -183,16 +200,26 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
     this.parser = new JSONParser();
     this.messageDistributor = new DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes);
     this.configurations = new ProfilerConfigurations();
-    this.flushSignal = new FixedFrequencyFlushSignal(periodDurationMillis);
+    this.activeFlushSignal = new FixedFrequencyFlushSignal(periodDurationMillis);
     setupZookeeper();
+    startExpiredFlushTimer();
   }
 
   @Override
   public void cleanup() {
-    zookeeperCache.close();
-    zookeeperClient.close();
+    try {
+      zookeeperCache.close();
+      zookeeperClient.close();
+      expiredFlushTimer.close();
+
+    } catch(Throwable e) {
+      LOG.error("Exception when cleaning up", e);
+    }
   }
 
+  /**
+   * Setup connectivity to Zookeeper which provides the necessary configuration for the bolt.
+   */
   private void setupZookeeper() {
     try {
       if (zookeeperClient == null) {
@@ -248,18 +275,6 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
     emitters.forEach(emitter -> emitter.declareOutputFields(declarer));
   }
 
-  /**
-   * Defines the frequency at which the bolt will receive tick tuples.  Tick tuples are
-   * used to control how often a profile is flushed.
-   */
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-
-    Map<String, Object> conf = super.getComponentConfiguration();
-    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, TimeUnit.MILLISECONDS.toSeconds(profileTimeToLiveMillis));
-    return conf;
-  }
-
   private Context getStellarContext() {
 
     Map<String, Object> global = getConfigurations().getGlobalConfig();
@@ -282,24 +297,12 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
 
       // handle each tuple in the window
       for(Tuple tuple : window.get()) {
-
-        if(TupleUtils.isTick(tuple)) {
-          handleTick();
-
-        } else {
-          handleMessage(tuple);
-        }
+        handleMessage(tuple);
       }
 
-      // time to flush?
-      if(flushSignal.isTimeToFlush()) {
-        flushSignal.reset();
-
-        // flush the active profiles
-        List<ProfileMeasurement> measurements = messageDistributor.flush();
-        emitMeasurements(measurements);
-
-        LOG.debug("Flushed active profiles and found {} measurement(s).", measurements.size());
+      // time to flush active profiles?
+      if(activeFlushSignal.isTimeToFlush()) {
+        flushActive();
       }
 
     } catch (Throwable e) {
@@ -310,17 +313,37 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
   }
 
   /**
-   * Flush all expired profiles when a 'tick' is received.
+   * Flush all active profiles.
+   */
+  protected void flushActive() {
+    activeFlushSignal.reset();
+
+    // flush the active profiles
+    List<ProfileMeasurement> measurements;
+    synchronized(messageDistributor) {
+      measurements = messageDistributor.flush();
+      emitMeasurements(measurements);
+    }
+
+    LOG.debug("Flushed active profiles and found {} measurement(s).", measurements.size());
+
+  }
+
+  /**
+   * Flushes all expired profiles.
    *
-   * If a profile has not received a message for an extended period of time then it is
+   * <p>If a profile has not received a message for an extended period of time then it is
    * marked as expired.  Periodically we need to flush these expired profiles to ensure
    * that their state is not lost.
    */
-  private void handleTick() {
+  protected void flushExpired() {
 
     // flush the expired profiles
-    List<ProfileMeasurement> measurements = messageDistributor.flushExpired();
-    emitMeasurements(measurements);
+    List<ProfileMeasurement> measurements;
+    synchronized (messageDistributor) {
+      measurements = messageDistributor.flushExpired();
+      emitMeasurements(measurements);
+    }
 
     LOG.debug("Flushed expired profiles and found {} measurement(s).", measurements.size());
   }
@@ -339,11 +362,13 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
     Long timestamp = getField(TIMESTAMP_TUPLE_FIELD, input, Long.class);
 
     // keep track of time
-    flushSignal.update(timestamp);
+    activeFlushSignal.update(timestamp);
     
     // distribute the message
     MessageRoute route = new MessageRoute(definition, entity);
-    messageDistributor.distribute(message, timestamp, route, getStellarContext());
+    synchronized (messageDistributor) {
+      messageDistributor.distribute(message, timestamp, route, getStellarContext());
+    }
 
     LOG.debug("Message distributed: profile={}, entity={}, timestamp={}", definition.getProfile(), entity, timestamp);
   }
@@ -395,10 +420,46 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
     return value;
   }
 
+  /**
+   * Converts milliseconds to seconds and handles an ugly cast.
+   *
+   * @param millis Duration in milliseconds.
+   * @return Duration in seconds.
+   */
+  private int toSeconds(long millis) {
+    return (int) TimeUnit.MILLISECONDS.toSeconds(millis);
+  }
+
+  /**
+   * Creates a timer that regularly flushes expired profiles on a separate thread.
+   */
+  private void startExpiredFlushTimer() {
+
+    expiredFlushTimer = createTimer("flush-expired-profiles-timer");
+    expiredFlushTimer.scheduleRecurring(0, toSeconds(profileTimeToLiveMillis), () -> flushExpired());
+  }
+
+  /**
+   * Creates a timer that can execute a task on a fixed interval.
+   *
+   * <p>If the timer encounters an exception, the entire process will be killed.
+   *
+   * @param name The name of the timer.
+   * @return The timer.
+   */
+  private StormTimer createTimer(String name) {
+
+    return new StormTimer(name, (thread, exception) -> {
+      String msg = String.format("Unexpected exception in timer task; timer=%s", name);
+      LOG.error(msg, exception);
+      Utils.exitProcess(1, msg);
+    });
+  }
+
   @Override
   public BaseWindowedBolt withTumblingWindow(BaseWindowedBolt.Duration duration) {
 
-    // need to capture the window duration for setting the flush count down
+    // need to capture the window duration to validate it along with other profiler settings
     this.windowDurationMillis = duration.value;
     return super.withTumblingWindow(duration);
   }
@@ -464,7 +525,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
   }
 
   public ProfileBuilderBolt withFlushSignal(FlushSignal flushSignal) {
-    this.flushSignal = flushSignal;
+    this.activeFlushSignal = flushSignal;
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index 4e62eee..a92a432 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -21,7 +21,6 @@
 package org.apache.metron.profiler.bolt;
 
 import org.apache.metron.common.bolt.ConfiguredProfilerBolt;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.profiler.DefaultMessageRouter;
 import org.apache.metron.profiler.MessageRoute;

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/percentiles/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/percentiles/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/percentiles/profiler.json
deleted file mode 100644
index 8a54834..0000000
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/percentiles/profiler.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
-  "profiles": [
-    {
-      "profile": "percentiles",
-      "foreach": "ip_src_addr",
-      "onlyif": "protocol == 'HTTP'",
-      "init":   { "s": "STATS_INIT(100)" },
-      "update": { "s": "STATS_ADD(s, length)" },
-      "result": "STATS_PERCENTILE(s, 0.7)"
-    }
-  ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json
new file mode 100644
index 0000000..e75ec0f
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json
@@ -0,0 +1,11 @@
+{
+  "profiles": [
+    {
+      "profile": "processing-time-test",
+      "foreach": "ip_src_addr",
+      "init":   { "counter": "0" },
+      "update": { "counter": "counter + 1" },
+      "result": "counter"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-1/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-1/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-1/profiler.json
deleted file mode 100644
index 96c60a1..0000000
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-1/profiler.json
+++ /dev/null
@@ -1,17 +0,0 @@
-{
-  "profiles": [
-    {
-      "profile": "example1",
-      "foreach": "ip_src_addr",
-      "onlyif": "protocol == 'HTTP'",
-      "init": {
-        "total_bytes": 0.0
-      },
-      "update": {
-        "total_bytes": "total_bytes + bytes_in"
-      },
-      "result": "total_bytes",
-      "expires": 30
-    }
-  ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-2/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-2/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-2/profiler.json
deleted file mode 100644
index e5d8f39..0000000
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-2/profiler.json
+++ /dev/null
@@ -1,18 +0,0 @@
-{
-  "profiles": [
-    {
-      "profile": "example2",
-      "foreach": "ip_src_addr",
-      "onlyif": "protocol == 'DNS' or protocol == 'HTTP'",
-      "init": {
-        "num_dns": 1.0,
-        "num_http": 1.0
-      },
-      "update": {
-        "num_dns": "num_dns + (if protocol == 'DNS' then 1 else 0)",
-        "num_http": "num_http + (if protocol == 'HTTP' then 1 else 0)"
-      },
-      "result": "num_dns / num_http"
-    }
-  ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json
deleted file mode 100644
index 67cdefd..0000000
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
-  "profiles": [
-    {
-      "profile": "example3",
-      "foreach": "ip_src_addr",
-      "onlyif": "protocol == 'HTTP'",
-      "update": { "s": "STATS_ADD(s, length)" },
-      "result": "STATS_MEAN(s)"
-    }
-  ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json
deleted file mode 100644
index b003ce0..0000000
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
-  "profiles": [
-    {
-      "profile": "example4",
-      "foreach": "ip_src_addr",
-      "onlyif": "protocol == 'HTTP'",
-      "update": { "s": "STATS_ADD(s, length)" },
-      "result": "s"
-    }
-  ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 78e20e0..3d009fb 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -48,7 +48,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -67,6 +66,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
   private ProfileConfig profile2;
   private ProfileMeasurementEmitter emitter;
   private ManualFlushSignal flushSignal;
+  private ProfileMeasurement measurement;
 
   @Before
   public void setup() throws Exception {
@@ -95,6 +95,12 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
             .withUpdate(Collections.singletonMap("x", "x + 1"))
             .withResult("x");
 
+    measurement = new ProfileMeasurement()
+            .withEntity("entity1")
+            .withProfileName("profile1")
+            .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
+            .withProfileValue(22);
+
     flushSignal = new ManualFlushSignal();
     flushSignal.setFlushNow(false);
   }
@@ -127,23 +133,16 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
 
   /**
    * If the {@code FlushSignal} tells the bolt to flush, it should flush the {@code MessageDistributor}
-   * and emit the {@code ProfileMeasurement} values.
+   * and emit the {@code ProfileMeasurement} values from all active profiles.
    */
   @Test
-  public void testEmitWhenFlush() throws Exception {
+  public void testFlushActiveProfiles() throws Exception {
 
     ProfileBuilderBolt bolt = createBolt();
 
-    // create a profile measurement
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withEntity("entity1")
-            .withProfileName("profile1")
-            .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
-            .withProfileValue(22);
-
     // create a mock that returns the profile measurement above
     MessageDistributor distributor = mock(MessageDistributor.class);
-    when(distributor.flush()).thenReturn(Collections.singletonList(m));
+    when(distributor.flush()).thenReturn(Collections.singletonList(measurement));
     bolt.withMessageDistributor(distributor);
 
     // signal the bolt to flush
@@ -157,30 +156,23 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
     // a profile measurement should be emitted by the bolt
     List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1);
     assertEquals(1, measurements.size());
-    assertEquals(m, measurements.get(0));
+    assertEquals(measurement, measurements.get(0));
   }
 
   /**
    * If the {@code FlushSignal} tells the bolt NOT to flush, nothing should be emitted.
    */
   @Test
-  public void testDoNotEmitWhenNoFlush() throws Exception {
+  public void testDoNotFlushActiveProfiles() throws Exception {
 
     ProfileBuilderBolt bolt = createBolt();
 
-    // create a profile measurement
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withEntity("entity1")
-            .withProfileName("profile1")
-            .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
-            .withProfileValue(22);
-
-    // create a mock that returns the profile measurement above
+    // create a mock where flush() returns the profile measurement above
     MessageDistributor distributor = mock(MessageDistributor.class);
-    when(distributor.flush()).thenReturn(Collections.singletonList(m));
+    when(distributor.flush()).thenReturn(Collections.singletonList(measurement));
     bolt.withMessageDistributor(distributor);
 
-    // no flush signal
+    // there is no flush signal
     flushSignal.setFlushNow(false);
 
     // execute the bolt
@@ -193,6 +185,29 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
   }
 
   /**
+   * Expired profiles should be flushed regularly, even if no input telemetry
+   * has been received.
+   */
+  @Test
+  public void testFlushExpiredProfiles() throws Exception {
+
+    ProfileBuilderBolt bolt = createBolt();
+
+    // create a mock where flushExpired() returns the profile measurement above
+    MessageDistributor distributor = mock(MessageDistributor.class);
+    when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement));
+    bolt.withMessageDistributor(distributor);
+
+    // execute test by flushing expired profiles. this is normally triggered by a timer task.
+    bolt.flushExpired();
+
+    // a profile measurement should be emitted by the bolt
+    List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1);
+    assertEquals(1, measurements.size());
+    assertEquals(measurement, measurements.get(0));
+  }
+
+  /**
    * A {@link ProfileMeasurement} is built for each profile/entity pair.  The measurement should be emitted to each
    * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations.
    */
@@ -232,61 +247,6 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
     verify(outputCollector, times(1)).emit(eq("destination3"), any());
   }
 
-  @Test
-  public void testFlushExpiredWithTick() throws Exception {
-
-    ProfileBuilderBolt bolt = createBolt();
-
-    // create a mock
-    MessageDistributor distributor = mock(MessageDistributor.class);
-    bolt.withMessageDistributor(distributor);
-
-    // tell the bolt to flush on the first window
-    flushSignal.setFlushNow(true);
-
-    // execute the bolt; include a tick tuple in the window
-    Tuple tuple1 = createTuple("entity", message1, profile1, 100000000L);
-    TupleWindow tupleWindow = createWindow(tuple1, mockTickTuple());
-    bolt.execute(tupleWindow);
-
-    // ensure the expired profiles were flushed when the tick tuple was received
-    verify(distributor).flushExpired();
-  }
-
-  @Test
-  public void testFlushExpiredWithNoTick() throws Exception {
-
-    ProfileBuilderBolt bolt = createBolt();
-
-    // create a mock
-    MessageDistributor distributor = mock(MessageDistributor.class);
-    bolt.withMessageDistributor(distributor);
-
-    // tell the bolt to flush on the first window
-    flushSignal.setFlushNow(true);
-
-    // execute the bolt; NO tick tuple
-    Tuple tuple1 = createTuple("entity", message1, profile1, 100000000L);
-    TupleWindow tupleWindow = createWindow(tuple1);
-    bolt.execute(tupleWindow);
-
-    // there was no tick tuple; the expired profiles should NOT have been flushed
-    verify(distributor, times(0)).flushExpired();
-  }
-
-  /**
-   * Creates a mock tick tuple to use for testing.
-   * @return A mock tick tuple.
-   */
-  private Tuple mockTickTuple() {
-
-    Tuple tuple = mock(Tuple.class);
-    when(tuple.getSourceComponent()).thenReturn("__system");
-    when(tuple.getSourceStreamId()).thenReturn("__tick");
-
-    return tuple;
-  }
-
   /**
    * Retrieves the ProfileMeasurement(s) (if any) that have been emitted.
    *
@@ -334,18 +294,6 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
    */
   private ProfileBuilderBolt createBolt() throws IOException {
 
-    return createBolt(30, TimeUnit.SECONDS);
-  }
-
-  /**
-   * Create a ProfileBuilderBolt to test.
-   *
-   * @param windowDuration The event window duration.
-   * @param windowDurationUnits The units of the event window duration.
-   * @return A {@link ProfileBuilderBolt} to test.
-   */
-  private ProfileBuilderBolt createBolt(int windowDuration, TimeUnit windowDurationUnits) throws IOException {
-
     // defines the zk configurations accessible from the bolt
     ProfilerConfigurations configurations = new ProfilerConfigurations();
     configurations.updateGlobalConfig(Collections.emptyMap());
@@ -359,7 +307,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
             .withEmitter(emitter)
             .withProfilerConfigurations(configurations)
             .withPeriodDuration(1, TimeUnit.MINUTES)
-            .withTumblingWindow(new BaseWindowedBolt.Duration(windowDuration, windowDurationUnits));
+            .withTumblingWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS));
     bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
 
     // set the flush signal AFTER calling 'prepare'

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index c48a3e9..8f5ced3 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -20,9 +20,6 @@
 
 package org.apache.metron.profiler.integration;
 
-import com.google.common.base.Joiner;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.math.util.MathUtils;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -41,10 +38,8 @@ import org.apache.metron.profiler.hbase.ColumnBuilder;
 import org.apache.metron.profiler.hbase.RowKeyBuilder;
 import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
 import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.metron.statistics.OnlineStatisticsProvider;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -61,6 +56,7 @@ import static com.google.code.tempusfugit.temporal.Timeout.timeout;
 import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * An integration test of the Profiler topology.
@@ -70,247 +66,103 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   private static final String TEST_RESOURCES = "../../metron-analytics/metron-profiler/src/test";
   private static final String FLUX_PATH = "../metron-profiler/src/main/flux/profiler/remote.yaml";
 
-  /**
-   * {
-   * "ip_src_addr": "10.0.0.1",
-   * "protocol": "HTTPS",
-   * "length": 10,
-   * "bytes_in": 234
-   * }
-   */
-  @Multiline
-  private static String message1;
-
-  /**
-   * {
-   * "ip_src_addr": "10.0.0.2",
-   * "protocol": "HTTP",
-   * "length": 20,
-   * "bytes_in": 390
-   * }
-   */
-  @Multiline
-  private static String message2;
-
-  /**
-   * {
-   * "ip_src_addr": "10.0.0.3",
-   * "protocol": "DNS",
-   * "length": 30,
-   * "bytes_in": 560
-   * }
-   */
-  @Multiline
-  private static String message3;
-
-  private static ColumnBuilder columnBuilder;
-  private static ZKServerComponent zkComponent;
-  private static FluxTopologyComponent fluxComponent;
-  private static KafkaComponent kafkaComponent;
-  private static ConfigUploadComponent configUploadComponent;
-  private static ComponentRunner runner;
-  private static MockHTable profilerTable;
+  public static final long startAt = 10;
+  public static final String entity = "10.0.0.1";
 
   private static final String tableName = "profiler";
   private static final String columnFamily = "P";
-  private static final double epsilon = 0.001;
   private static final String inputTopic = Constants.INDEXING_TOPIC;
   private static final String outputTopic = "profiles";
   private static final int saltDivisor = 10;
 
-  private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(5);
+  private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1);
   private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(5);
-  private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(15);
-  private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20);
+  private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(10);
+  private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(15);
   private static final long maxRoutesPerBolt = 100000;
 
-  /**
-   * Tests the first example contained within the README.
-   */
-  @Test
-  public void testExample1() throws Exception {
-
-    uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-1");
-
-    // start the topology and write test messages to kafka
-    fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
-    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
-    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
-
-    // verify - ensure the profile is being persisted
-    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
-            timeout(seconds(180)));
-
-    // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
-    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
-            columnBuilder.getColumnQualifier("value"), Double.class);
-
-    // verify - there are 3 'HTTP' each with 390 bytes
-    Assert.assertTrue(actuals.stream().anyMatch(val ->
-            MathUtils.equals(390.0 * 3, val, epsilon)
-    ));
-  }
-
-  /**
-   * Tests the second example contained within the README.
-   */
-  @Test
-  public void testExample2() throws Exception {
-
-    uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-2");
-
-    // start the topology and write test messages to kafka
-    fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
-    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
-    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
-
-    // expect 2 values written by the profile; one for 10.0.0.2 and another for 10.0.0.3
-    final int expected = 2;
-
-    // verify - ensure the profile is being persisted
-    waitOrTimeout(() -> profilerTable.getPutLog().size() >= expected,
-            timeout(seconds(90)));
-
-    // verify - expect 2 results as 2 hosts involved; 10.0.0.2 sends 'HTTP' and 10.0.0.3 send 'DNS'
-    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
-            columnBuilder.getColumnQualifier("value"), Double.class);
-
-    // verify - 10.0.0.3 -> 1/4
-    Assert.assertTrue( "Could not find a value near 1/4. Actual values read are are: " + Joiner.on(",").join(actuals),
-            actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/4.0, epsilon)
-    ));
-
-    // verify - 10.0.0.2 -> 4/1
-    Assert.assertTrue("Could not find a value near 4. Actual values read are are: " + Joiner.on(",").join(actuals),
-            actuals.stream().anyMatch(val -> MathUtils.equals(val, 4.0/1.0, epsilon)
-    ));
-  }
-
-  /**
-   * Tests the third example contained within the README.
-   */
-  @Test
-  public void testExample3() throws Exception {
-
-    uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-3");
+  private static ColumnBuilder columnBuilder;
+  private static ZKServerComponent zkComponent;
+  private static FluxTopologyComponent fluxComponent;
+  private static KafkaComponent kafkaComponent;
+  private static ConfigUploadComponent configUploadComponent;
+  private static ComponentRunner runner;
+  private static MockHTable profilerTable;
 
-    // start the topology and write test messages to kafka
-    fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
-    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
-    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
-
-    // verify - ensure the profile is being persisted
-    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
-            timeout(seconds(90)));
-
-    // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
-    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
-            columnBuilder.getColumnQualifier("value"), Double.class);
-
-    // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
-    Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
-            actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)
-    ));
-  }
+  private static String message1;
+  private static String message2;
+  private static String message3;
 
   /**
-   * Tests the fourth example contained within the README.
+   * The Profiler can generate profiles based on processing time.  With processing time,
+   * the Profiler builds profiles based on when the telemetry was processed.
+   *
+   * <p>Not defining a 'timestampField' within the Profiler configuration tells the Profiler
+   * to use processing time.
    */
   @Test
-  public void testExample4() throws Exception {
+  public void testProcessingTime() throws Exception {
 
-    uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-4");
+    // upload the config to zookeeper
+    uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test");
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
-    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
-    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
-
-    // verify - ensure the profile is being persisted
-    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
-            timeout(seconds(90)));
-
-    // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
-    byte[] column = columnBuilder.getColumnQualifier("value");
-    List<OnlineStatisticsProvider> actuals = read(profilerTable.getPutLog(), columnFamily, column, OnlineStatisticsProvider.class);
-
-    // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
-    Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
-            actuals.stream().anyMatch(val -> MathUtils.equals(val.getMean(), 20.0, epsilon)
-    ));
-  }
 
-  @Test
-  public void testPercentiles() throws Exception {
-
-    uploadConfig(TEST_RESOURCES + "/config/zookeeper/percentiles");
+    // the messages that will be applied to the profile
+    kafkaComponent.writeMessages(inputTopic, message1);
+    kafkaComponent.writeMessages(inputTopic, message2);
+    kafkaComponent.writeMessages(inputTopic, message3);
 
-    // start the topology and write test messages to kafka
-    fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
-    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
-    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
+    // storm needs at least one message to close its event window
+    int attempt = 0;
+    while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) {
 
-    // verify - ensure the profile is being persisted
-    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
-            timeout(seconds(90)));
+      // sleep, at least beyond the current window
+      Thread.sleep(windowDurationMillis + windowLagMillis);
 
-    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
-            columnBuilder.getColumnQualifier("value"), Double.class);
+      // send another message to help close the current event window
+      kafkaComponent.writeMessages(inputTopic, message2);
+    }
 
-    // verify - the 70th percentile of x3, 20s = 20.0
-    Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
-            actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)));
+    // validate what was flushed
+    List<Integer> actuals = read(
+            profilerTable.getPutLog(),
+            columnFamily,
+            columnBuilder.getColumnQualifier("value"),
+            Integer.class);
+    assertEquals(1, actuals.size());
+    assertTrue(actuals.get(0) >= 3);
   }
 
   /**
-   * The Profiler can optionally perform event time processing.  With event time processing,
+   * The Profiler can generate profiles using event time.  With event time processing,
    * the Profiler uses timestamps contained in the source telemetry.
    *
    * <p>Defining a 'timestampField' within the Profiler configuration tells the Profiler
    * from which field the timestamp should be extracted.
    */
   @Test
-  public void testEventTimeProcessing() throws Exception {
-
-    // constants used for the test
-    final long startAt = 10;
-    final String entity = "10.0.0.1";
-    final String profileName = "event-time-test";
-
-    // create some messages that contain a timestamp - a really old timestamp; close to 1970
-    String message1 = new MessageBuilder()
-            .withField("ip_src_addr", entity)
-            .withField("timestamp", startAt)
-            .build()
-            .toJSONString();
-
-    String message2 = new MessageBuilder()
-            .withField("ip_src_addr", entity)
-            .withField("timestamp", startAt + 100)
-            .build()
-            .toJSONString();
+  public void testEventTime() throws Exception {
 
+    // upload the profiler config to zookeeper
     uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test");
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, message1, message2);
+    kafkaComponent.writeMessages(inputTopic, message1);
+    kafkaComponent.writeMessages(inputTopic, message2);
+    kafkaComponent.writeMessages(inputTopic, message3);
 
-    // verify - ensure the profile is being persisted
-    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
-            timeout(seconds(90)));
+    // wait until the profile is flushed
+    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90)));
 
     List<Put> puts = profilerTable.getPutLog();
     assertEquals(1, puts.size());
 
     // inspect the row key to ensure the profiler used event time correctly.  the timestamp
     // embedded in the row key should match those in the source telemetry
-    byte[] expectedRowKey = generateExpectedRowKey(profileName, entity, startAt);
+    byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt);
     byte[] actualRowKey = puts.get(0).getRow();
     String msg = String.format("expected '%s', got '%s'",
             new String(expectedRowKey, "UTF-8"),
@@ -364,6 +216,26 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
   @BeforeClass
   public static void setupBeforeClass() throws UnableToStartException {
+
+    // create some messages that contain a timestamp - a really old timestamp; close to 1970
+    message1 = new MessageBuilder()
+            .withField("ip_src_addr", entity)
+            .withField("timestamp", startAt)
+            .build()
+            .toJSONString();
+
+    message2 = new MessageBuilder()
+            .withField("ip_src_addr", entity)
+            .withField("timestamp", startAt + 100)
+            .build()
+            .toJSONString();
+
+    message3 = new MessageBuilder()
+            .withField("ip_src_addr", entity)
+            .withField("timestamp", startAt + (windowDurationMillis * 2))
+            .build()
+            .toJSONString();
+
     columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
 
     // storm topology properties

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
index 6205fbf..f5b46e6 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
@@ -18,6 +18,8 @@
 package org.apache.metron.common.configuration.profiler;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -225,32 +227,39 @@ public class ProfileConfig implements Serializable {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
 
-    ProfileConfig that = (ProfileConfig) o;
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
 
-    if (profile != null ? !profile.equals(that.profile) : that.profile != null) return false;
-    if (foreach != null ? !foreach.equals(that.foreach) : that.foreach != null) return false;
-    if (onlyif != null ? !onlyif.equals(that.onlyif) : that.onlyif != null) return false;
-    if (init != null ? !init.equals(that.init) : that.init != null) return false;
-    if (update != null ? !update.equals(that.update) : that.update != null) return false;
-    if (groupBy != null ? !groupBy.equals(that.groupBy) : that.groupBy != null) return false;
-    if (result != null ? !result.equals(that.result) : that.result != null) return false;
-    return expires != null ? expires.equals(that.expires) : that.expires == null;
+    ProfileConfig that = (ProfileConfig) o;
+    return new EqualsBuilder()
+            .append(profile, that.profile)
+            .append(foreach, that.foreach)
+            .append(onlyif, that.onlyif)
+            .append(init, that.init)
+            .append(update, that.update)
+            .append(groupBy, that.groupBy)
+            .append(result, that.result)
+            .append(expires, that.expires)
+            .isEquals();
   }
 
   @Override
   public int hashCode() {
-    int result1 = profile != null ? profile.hashCode() : 0;
-    result1 = 31 * result1 + (foreach != null ? foreach.hashCode() : 0);
-    result1 = 31 * result1 + (onlyif != null ? onlyif.hashCode() : 0);
-    result1 = 31 * result1 + (init != null ? init.hashCode() : 0);
-    result1 = 31 * result1 + (update != null ? update.hashCode() : 0);
-    result1 = 31 * result1 + (groupBy != null ? groupBy.hashCode() : 0);
-    result1 = 31 * result1 + (result != null ? result.hashCode() : 0);
-    result1 = 31 * result1 + (expires != null ? expires.hashCode() : 0);
-    return result1;
+    return new HashCodeBuilder(17, 37)
+            .append(profile)
+            .append(foreach)
+            .append(onlyif)
+            .append(init)
+            .append(update)
+            .append(groupBy)
+            .append(result)
+            .append(expires)
+            .toHashCode();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
index ec4a98a..5240d7a 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
@@ -154,7 +154,7 @@ public class ZKConfigurationsCacheIntegrationTest {
     }
     {
       //profiler
-      byte[] config = IOUtils.toByteArray(new FileInputStream(new File(profilerDir, "/readme-example-1/profiler.json")));
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new File(profilerDir, "/event-time-test/profiler.json")));
       ConfigurationsUtils.writeProfilerConfigToZookeeper( config, client);
     }
     {
@@ -284,7 +284,7 @@ public class ZKConfigurationsCacheIntegrationTest {
     }
     //profiler
     {
-      File inFile = new File(profilerDir, "/readme-example-1/profiler.json");
+      File inFile = new File(profilerDir, "/event-time-test/profiler.json");
       ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, ProfilerConfig.class);
       ProfilerConfigurations config = cache.get( ProfilerConfigurations.class);
       assertEventually(() -> Assert.assertEquals(expectedConfig, config.getProfilerConfig()));

http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
index d16e2f6..6953b18 100644
--- a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
@@ -24,7 +24,7 @@ import java.lang.invoke.MethodHandles;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.metron.hbase.HTableProvider;
@@ -77,6 +77,8 @@ public class HBaseBolt extends BaseRichBolt {
 
   /**
    * The name of the class that should be used as a table provider.
+   *
+   * <p>Defaults to 'org.apache.metron.hbase.HTableProvider'.
    */
   protected String tableProviderClazzName = "org.apache.metron.hbase.HTableProvider";
 
@@ -126,6 +128,8 @@ public class HBaseBolt extends BaseRichBolt {
 
   @Override
   public Map<String, Object> getComponentConfiguration() {
+    LOG.debug("Tick tuples expected every {} second(s)", flushIntervalSecs);
+
     Config conf = new Config();
     conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs);
     return conf;
@@ -136,7 +140,13 @@ public class HBaseBolt extends BaseRichBolt {
     this.collector = collector;
     this.batchHelper = new BatchHelper(batchSize, collector);
 
-    TableProvider provider = this.tableProvider == null ?getTableProvider(tableProviderClazzName):this.tableProvider;
+    TableProvider provider;
+    if(this.tableProvider == null) {
+      provider = createTableProvider(tableProviderClazzName);
+    } else {
+      provider = this.tableProvider;
+    }
+
     hbaseClient = new HBaseClient(provider, HBaseConfiguration.create(), tableName);
   }
 
@@ -147,6 +157,8 @@ public class HBaseBolt extends BaseRichBolt {
 
   @Override
   public void execute(Tuple tuple) {
+    LOG.trace("Received a tuple.");
+
     try {
       if (batchHelper.shouldHandle(tuple)) {
         save(tuple);
@@ -179,12 +191,15 @@ public class HBaseBolt extends BaseRichBolt {
     }
 
     batchHelper.addBatch(tuple);
+    LOG.debug("Added mutation to the batch; size={}", batchHelper.getBatchSize());
   }
 
   /**
    * Flush all saved operations.
    */
   private void flush() {
+    LOG.debug("About to flush a batch of {} mutation(s)", batchHelper.getBatchSize());
+
     this.hbaseClient.mutate();
     batchHelper.ack();
   }
@@ -193,7 +208,8 @@ public class HBaseBolt extends BaseRichBolt {
    * Creates a TableProvider based on a class name.
    * @param connectorImpl The class name of a TableProvider
    */
-  private static TableProvider getTableProvider(String connectorImpl) {
+  private static TableProvider createTableProvider(String connectorImpl) {
+    LOG.trace("Creating table provider; className={}", connectorImpl);
 
     // if class name not defined, use a reasonable default
     if(StringUtils.isEmpty(connectorImpl) || connectorImpl.charAt(0) == '$') {


Mime
View raw message