metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From o...@apache.org
Subject [39/52] [abbrv] metron git commit: METRON-1494 Profiler Emits Messages to Kafka When Not Needed (nickwallen) closes apache/metron#967
Date Wed, 18 Apr 2018 15:00:09 GMT
METRON-1494 Profiler Emits Messages to Kafka When Not Needed (nickwallen) closes apache/metron#967


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/62d1a1bf
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/62d1a1bf
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/62d1a1bf

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 62d1a1bf7e8b9b3ee2f260c358719ea5080c9045
Parents: 438893b
Author: nickwallen <nick@nickallen.org>
Authored: Wed Apr 11 17:57:09 2018 -0400
Committer: nickallen <nickallen@apache.org>
Committed: Wed Apr 11 17:57:09 2018 -0400

----------------------------------------------------------------------
 .../metron/profiler/DefaultProfileBuilder.java  |   5 +
 .../bolt/FixedFrequencyFlushSignal.java         |  13 +-
 .../metron/profiler/bolt/HBaseEmitter.java      |  12 +-
 .../metron/profiler/bolt/KafkaEmitter.java      |  78 +++++--
 .../profiler/bolt/ProfileSplitterBolt.java      |   5 +
 .../metron/profiler/bolt/HBaseEmitterTest.java  | 120 +++++++++++
 .../metron/profiler/bolt/KafkaEmitterTest.java  | 201 +++++++++++++------
 7 files changed, 358 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
index 4b564c9..66034ac 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
@@ -124,8 +124,13 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable
{
    */
   @Override
   public void apply(JSONObject message, long timestamp) {
+    LOG.debug("Applying message to profile; profile={}, entity={}, timestamp={}",
+            profileName, entity, timestamp);
+
     try {
       if (!isInitialized()) {
+        LOG.debug("Initializing profile; profile={}, entity={}, timestamp={}",
+                profileName, entity, timestamp);
 
         // execute each 'init' expression
         assign(definition.getInit(), message, "init");

http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
index b9f57dd..8c0a0b1 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
@@ -94,7 +94,8 @@ public class FixedFrequencyFlushSignal implements FlushSignal {
 
       // set the next time to flush
       flushTime = currentTime + flushFrequency;
-      LOG.debug("Setting flush time; flushTime={}, currentTime={}, flushFreq={}",
+      LOG.debug("Setting flush time; '{}' ms until flush; flushTime={}, currentTime={}, flushFreq={}",
+              timeToNextFlush(),
               flushTime,
               currentTime,
               flushFrequency);
@@ -112,7 +113,7 @@ public class FixedFrequencyFlushSignal implements FlushSignal {
     boolean flush = currentTime > flushTime;
     LOG.debug("Flush={}, '{}' ms until flush; currentTime={}, flushTime={}",
             flush,
-            flush ? 0 : (flushTime-currentTime),
+            timeToNextFlush(),
             currentTime,
             flushTime);
 
@@ -123,4 +124,12 @@ public class FixedFrequencyFlushSignal implements FlushSignal {
   public long currentTimeMillis() {
     return currentTime;
   }
+
+  /**
+   * Returns the number of milliseconds to the next flush.
+   * @return The time left until the next flush.
+   */
+  private long timeToNextFlush() {
+    return Math.max(0, flushTime - currentTime);
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
index 8e1229a..e4e3552 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
@@ -40,7 +40,7 @@ public class HBaseEmitter implements ProfileMeasurementEmitter, Serializable
{
   /**
    * The stream identifier used for this destination;
    */
-  private  String streamId = "hbase";
+  private String streamId = "hbase";
 
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
@@ -49,7 +49,17 @@ public class HBaseEmitter implements ProfileMeasurementEmitter, Serializable
{
 
   @Override
   public void emit(ProfileMeasurement measurement, OutputCollector collector) {
+
+    // measurements are always emitted to hbase
     collector.emit(getStreamId(), new Values(measurement));
+
+    LOG.debug("Emitted measurement; stream={}, profile={}, entity={}, period={}, start={},
end={}",
+            getStreamId(),
+            measurement.getProfileName(),
+            measurement.getEntity(),
+            measurement.getPeriod().getPeriod(),
+            measurement.getPeriod().getStartTimeMillis(),
+            measurement.getPeriod().getEndTimeMillis());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
index 29d1a49..87920da 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
@@ -19,8 +19,7 @@
 
 package org.apache.metron.profiler.bolt;
 
-import java.io.Serializable;
-import java.lang.invoke.MethodHandles;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.storm.task.OutputCollector;
@@ -31,6 +30,10 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+
 /**
  * Responsible for emitting a {@link ProfileMeasurement} to an output stream that will
  * persist data in HBase.
@@ -58,19 +61,48 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable
{
   @Override
   public void emit(ProfileMeasurement measurement, OutputCollector collector) {
 
-    JSONObject message = new JSONObject();
-    message.put("profile", measurement.getDefinition().getProfile());
-    message.put("entity", measurement.getEntity());
-    message.put("period", measurement.getPeriod().getPeriod());
-    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
-    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
-    message.put("timestamp", System.currentTimeMillis());
-    message.put("source.type", sourceType);
-    message.put("is_alert", "true");
+    // only need to emit, if there are triage values
+    Map<String, Object> triageValues = measurement.getTriageValues();
+    if(MapUtils.isNotEmpty(triageValues)) {
+
+      JSONObject message = createMessage(measurement);
+      appendTriageValues(measurement, message);
+      collector.emit(getStreamId(), new Values(message));
+
+      LOG.debug("Emitted measurement; stream={}, profile={}, entity={}, period={}, start={},
end={}",
+              getStreamId(),
+              measurement.getProfileName(),
+              measurement.getEntity(),
+              measurement.getPeriod().getPeriod(),
+              measurement.getPeriod().getStartTimeMillis(),
+              measurement.getPeriod().getEndTimeMillis());
+
+    } else {
+
+      LOG.debug("No triage values, nothing to emit; stream={}, profile={}, entity={}, period={},
start={}, end={}",
+              getStreamId(),
+              measurement.getProfileName(),
+              measurement.getEntity(),
+              measurement.getPeriod().getPeriod(),
+              measurement.getPeriod().getStartTimeMillis(),
+              measurement.getPeriod().getEndTimeMillis());
+    }
+  }
 
-    // append each of the triage values to the message
-    measurement.getTriageValues().forEach((key, value) -> {
+  /**
+   * Appends triage values obtained from a {@code ProfileMeasurement} to the
+   * outgoing message.
+   *
+   * @param measurement The measurement that may contain triage values.
+   * @param message The message that the triage values are appended to.
+   */
+  private void appendTriageValues(ProfileMeasurement measurement, JSONObject message) {
+
+    // for each triage value...
+    Map<String, Object> triageValues = MapUtils.emptyIfNull(measurement.getTriageValues());
+    triageValues.forEach((key, value) -> {
 
+      // append the triage value to the message
       if(isValidType(value)) {
         message.put(key, value);
 
@@ -83,8 +115,26 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable
{
                 key));
       }
     });
+  }
+
+  /**
+   * Creates a message that will be emitted to Kafka.
+   *
+   * @param measurement The profile measurement used as a basis for the message.
+   * @return A message that can be emitted to Kafka.
+   */
+  private JSONObject createMessage(ProfileMeasurement measurement) {
 
-    collector.emit(getStreamId(), new Values(message));
+    JSONObject message = new JSONObject();
+    message.put("profile", measurement.getDefinition().getProfile());
+    message.put("entity", measurement.getEntity());
+    message.put("period", measurement.getPeriod().getPeriod());
+    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
+    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
+    message.put("timestamp", System.currentTimeMillis());
+    message.put("source.type", sourceType);
+    message.put("is_alert", "true");
+    return message;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index a92a432..f28411f 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -190,6 +190,11 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
 
       Values values = createValues(message, timestamp, route);
       collector.emit(input, values);
+
+      LOG.debug("Found route for message; profile={}, entity={}, timestamp={}",
+              route.getProfileDefinition().getProfile(),
+              route.getEntity(),
+              timestamp);
     }
 
     LOG.debug("Found {} route(s) for message with timestamp={}", routes.size(), timestamp);

http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java
new file mode 100644
index 0000000..35ca4d9
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java
@@ -0,0 +1,120 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests the HBaseEmitter class.
+ */
+public class HBaseEmitterTest {
+
+  /**
+   * {
+   *   "profile": "profile-one",
+   *   "foreach": "ip_src_addr",
+   *   "init":   { "x": "0" },
+   *   "update": { "x": "x + 1" },
+   *   "result": "x"
+   * }
+   */
+  @Multiline
+  private String profileDefinition;
+
+  private HBaseEmitter emitter;
+  private ProfileConfig profile;
+  private OutputCollector collector;
+
+  @Before
+  public void setup() throws Exception {
+    emitter = new HBaseEmitter();
+    profile = createDefinition(profileDefinition);
+    collector = Mockito.mock(OutputCollector.class);
+  }
+
+  /**
+   * The handler should emit a message containing the result of executing
+   * the 'result/profile' expression.
+   */
+  @Test
+  public void testEmit() throws Exception {
+
+    // create a measurement that has triage values
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withDefinition(profile)
+            .withProfileValue(22);
+
+    // execute the test
+    emitter.emit(measurement, collector);
+
+    // the measurement should be emitted as-is
+    ProfileMeasurement actual = expectMeasurement(emitter, collector);
+    assertEquals(measurement, actual);
+  }
+
+  /**
+   * Verifies that the emitter does emit a {@code ProfileMeasurement}.
+   *
+   * @return The {@code ProfileMeasurement} that was emitted
+   */
+  private ProfileMeasurement expectMeasurement(HBaseEmitter hbaseEmitter, OutputCollector
collector) {
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(hbaseEmitter.getStreamId()), arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof ProfileMeasurement);
+    return (ProfileMeasurement) values.get(0);
+  }
+
+  /**
+   * Creates a profile definition based on a string of JSON.
+   * @param json The string of JSON.
+   */
+  private ProfileConfig createDefinition(String json) throws IOException {
+    return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
index b02e377..95a2d29 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
@@ -43,6 +43,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -58,54 +59,128 @@ public class KafkaEmitterTest {
    *   "foreach": "ip_src_addr",
    *   "init":   { "x": "0" },
    *   "update": { "x": "x + 1" },
-   *   "result": "x"
+   *   "result": {
+   *      "profile": "x",
+   *      "triage": {
+   *        "value": "x"
+   *       }
+   *    }
    * }
    */
   @Multiline
-  private String profileDefinition;
+  private String profileDefinitionWithTriage;
 
-  private KafkaEmitter handler;
+  private KafkaEmitter kafkaEmitter;
   private ProfileConfig profile;
   private OutputCollector collector;
 
   @Before
   public void setup() throws Exception {
-    handler = new KafkaEmitter();
-    profile = createDefinition(profileDefinition);
+    kafkaEmitter = new KafkaEmitter();
+    profile = createDefinition(profileDefinitionWithTriage);
     collector = Mockito.mock(OutputCollector.class);
   }
 
   /**
-   * The handler must serialize the ProfileMeasurement into a JSONObject.
+   * The handler should emit a message when a result/triage expression(s) has been defined.
    */
   @Test
-  public void testSerialization() throws Exception {
+  public void testEmit() throws Exception {
 
+    // create a measurement that has triage values
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withDefinition(profile)
+            .withTriageValues(Collections.singletonMap("triage-key", "triage-value"));
+
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+
+    // a message should be emitted
+    verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), any());
+  }
+
+  /**
+   * The handler should NOT emit a message when there is NO result/triage value(s).
+   */
+  @Test
+  public void testDoNotEmit() throws Exception {
+
+    // create a measurement with NO triage values
     ProfileMeasurement measurement = new ProfileMeasurement()
             .withProfileName("profile")
             .withEntity("entity")
             .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(Collections.singletonMap("triage-key", "triage-value"))
             .withDefinition(profile);
-    handler.emit(measurement, collector);
 
-    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
-    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
 
-    // expect a JSONObject
-    Values values = arg.getValue();
-    assertTrue(values.get(0) instanceof JSONObject);
+    // a message should NOT be emitted
+    verify(collector, times(0)).emit(eq(kafkaEmitter.getStreamId()), any());
+  }
 
-    // validate the json
-    JSONObject actual = (JSONObject) values.get(0);
-    assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
-    assertEquals(measurement.getEntity(), actual.get("entity"));
-    assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
-    assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
-    assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
-    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+  /**
+   * Validate that the message generated for Kafka should include the triage value.
+   */
+  @Test
+  public void testTriageValueInMessage() throws Exception {
+
+    // create a measurement that has triage values
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withDefinition(profile)
+            .withProfileName(profile.getProfile())
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", "triage-value"));
+
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
+
+    // validate the core parts of the message
+    assertEquals(measurement.getProfileName(),                    actual.get("profile"));
+    assertEquals(measurement.getEntity(),                         actual.get("entity"));
+    assertEquals(measurement.getPeriod().getPeriod(),             actual.get("period"));
+    assertEquals(measurement.getPeriod().getStartTimeMillis(),    actual.get("period.start"));
+    assertEquals(measurement.getPeriod().getEndTimeMillis(),      actual.get("period.end"));
+    assertEquals("profiler",                                      actual.get("source.type"));
     assertNotNull(actual.get("timestamp"));
-    assertEquals("profiler", actual.get("source.type"));
+
+    // validate that the triage value has been added
+    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+  }
+
+  /**
+   * Validate that the message generated for Kafka can include multiple triage values.
+   */
+  @Test
+  public void testMultipleTriageValueInMessage() throws Exception {
+
+    // multiple triage values have been defined
+    Map<String, Object> triageValues = ImmutableMap.of(
+            "x", 2,
+            "y", "4",
+            "z", 6.0);
+
+    // create a measurement that has multiple triage values
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withDefinition(profile)
+            .withProfileName(profile.getProfile())
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(triageValues);
+
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
+
+    // validate that ALL of the triage values have been added
+    assertEquals(measurement.getTriageValues().get("x"), actual.get("x"));
+    assertEquals(measurement.getTriageValues().get("y"), actual.get("y"));
+    assertEquals(measurement.getTriageValues().get("z"), actual.get("z"));
   }
 
   /**
@@ -120,30 +195,27 @@ public class KafkaEmitterTest {
             "invalid", new OnlineStatisticsProvider(),
             "valid", 4);
 
+    // create the measurement with a Map as a triage value; this is not allowed
     ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName("profile")
+            .withDefinition(profile)
+            .withProfileName(profile.getProfile())
             .withEntity("entity")
             .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(triageValues)
-            .withDefinition(profile);
-    handler.emit(measurement, collector);
+            .withTriageValues(triageValues);
 
-    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
-    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
-    Values values = arg.getValue();
-    assertTrue(values.get(0) instanceof JSONObject);
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
 
-    // only the triage expression value itself should have been skipped, all others should
be there
-    JSONObject actual = (JSONObject) values.get(0);
-    assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
-    assertEquals(measurement.getEntity(), actual.get("entity"));
-    assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
-    assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
-    assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
-    assertNotNull(actual.get("timestamp"));
-    assertEquals("profiler", actual.get("source.type"));
+    // validate the core parts of the message still exist
+    assertEquals(measurement.getProfileName(),                    actual.get("profile"));
+    assertEquals(measurement.getEntity(),                         actual.get("entity"));
+    assertEquals(measurement.getPeriod().getPeriod(),             actual.get("period"));
+    assertEquals(measurement.getPeriod().getStartTimeMillis(),    actual.get("period.start"));
+    assertEquals(measurement.getPeriod().getEndTimeMillis(),      actual.get("period.end"));
+    assertEquals("profiler",                                      actual.get("source.type"));
 
-    // the invalid expression should be skipped due to invalid type
+    // the invalid expression should be skipped and not included in the message
     assertFalse(actual.containsKey("invalid"));
 
     // but the valid expression should still be there
@@ -156,19 +228,18 @@ public class KafkaEmitterTest {
    */
   @Test
   public void testIntegerIsValidType() throws Exception {
+
+    // create a measurement with a triage value that is an integer
     ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName("profile")
+            .withDefinition(profile)
+            .withProfileName(profile.getProfile())
             .withEntity("entity")
             .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(Collections.singletonMap("triage-key", 123))
-            .withDefinition(profile);
-    handler.emit(measurement, collector);
+            .withTriageValues(Collections.singletonMap("triage-key", 123));
 
-    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
-    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
-    Values values = arg.getValue();
-    assertTrue(values.get(0) instanceof JSONObject);
-    JSONObject actual = (JSONObject) values.get(0);
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
 
     // the triage expression is valid
     assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
@@ -180,25 +251,37 @@ public class KafkaEmitterTest {
    */
   @Test
   public void testStringIsValidType() throws Exception {
+
+    // create a measurement with a triage value that is a string
     ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName("profile")
+            .withDefinition(profile)
+            .withProfileName(profile.getProfile())
             .withEntity("entity")
             .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(Collections.singletonMap("triage-key", "value"))
-            .withDefinition(profile);
-    handler.emit(measurement, collector);
+            .withTriageValues(Collections.singletonMap("triage-key", "value"));
 
-    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
-    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
-    Values values = arg.getValue();
-    assertTrue(values.get(0) instanceof JSONObject);
-    JSONObject actual = (JSONObject) values.get(0);
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
 
     // the triage expression is valid
     assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
   }
 
   /**
+   * Verifies that the KafkaEmitter does emit a JSONObject.
+   * @return The JSONObject that was emitted
+   */
+  private JSONObject expectJsonObject(KafkaEmitter kafkaEmitter, OutputCollector collector)
{
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+    return (JSONObject) values.get(0);
+  }
+
+  /**
    * Creates a profile definition based on a string of JSON.
    * @param json The string of JSON.
    */


Mime
View raw message