metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject [metron] branch master updated: METRON-1974 Batch Profiler Should Handle Errant Profiles Better (nickwallen) closes apache/metron#1326
Date Tue, 12 Feb 2019 17:25:44 GMT
This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cf78d2  METRON-1974 Batch Profiler Should Handle Errant Profiles Better (nickwallen)
closes apache/metron#1326
0cf78d2 is described below

commit 0cf78d223f24eaf7112bfb7fb52d8778f8a1a2fa
Author: nickwallen <nick@nickallen.org>
AuthorDate: Tue Feb 12 12:25:29 2019 -0500

    METRON-1974 Batch Profiler Should Handle Errant Profiles Better (nickwallen) closes apache/metron#1326
---
 .../spark/function/GroupByPeriodFunction.java      | 50 ++++++++++++++++-
 .../spark/function/ProfileBuilderFunction.java     | 31 ++++++++---
 .../spark/BatchProfilerIntegrationTest.java        | 59 +++++++++++++++-----
 .../spark/function/GroupByPeriodFunctionTest.java  | 63 ++++++++++++++++++++++
 .../spark/function/ProfileBuilderFunctionTest.java | 59 ++++++++++++++++----
 5 files changed, 229 insertions(+), 33 deletions(-)

diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
index 1b602f4..d06f7e9 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
@@ -47,6 +47,8 @@ public class GroupByPeriodFunction implements MapFunction<MessageRoute,
String>
    */
   private TimeUnit periodDurationUnits;
 
+  private static final String SEPARATOR = "__";
+
   public GroupByPeriodFunction(Properties profilerProperties) {
     periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(profilerProperties,
String.class));
     periodDuration = PERIOD_DURATION.get(profilerProperties, Integer.class);
@@ -55,6 +57,52 @@ public class GroupByPeriodFunction implements MapFunction<MessageRoute,
String>
   @Override
   public String call(MessageRoute route) {
     ProfilePeriod period = ProfilePeriod.fromTimestamp(route.getTimestamp(), periodDuration,
periodDurationUnits);
-    return route.getProfileDefinition().getProfile() + "-" + route.getEntity() + "-" + period.getPeriod();
+    return new StringBuilder()
+            .append(route.getProfileDefinition().getProfile())
+            .append(SEPARATOR)
+            .append(route.getEntity())
+            .append(SEPARATOR)
+            .append(period.getPeriod())
+            .toString();
+  }
+
+  /**
+   * @param groupKey The group key used to group {@link MessageRoute}s.
+   * @return The name of the profile.
+   */
+  public static String profileFromKey(String groupKey) {
+    String[] pieces = groupKey.split(SEPARATOR);
+    if(pieces.length == 3) {
+      return pieces[0];
+    } else {
+      return "unknown";
+    }
+  }
+
+  /**
+   * @param groupKey The group key used to group {@link MessageRoute}s.
+   * @return The name of the entity.
+   */
+  public static String entityFromKey(String groupKey) {
+    String[] pieces = groupKey.split(SEPARATOR);
+    if(pieces.length == 3) {
+      return pieces[1];
+    } else {
+      return "unknown";
+    }
   }
+
+  /**
+   * @param groupKey The group key used to group {@link MessageRoute}s.
+   * @return The period identifier.
+   */
+  public static String periodFromKey(String groupKey) {
+    String[] pieces = groupKey.split(SEPARATOR);
+    if(pieces.length == 3) {
+      return pieces[2];
+    } else {
+      return "unknown";
+    }
+  }
+
 }
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
index 273695b..7283b48 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
@@ -39,9 +39,13 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import static java.lang.String.format;
 import static java.util.Comparator.comparing;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS;
+import static org.apache.metron.profiler.spark.function.GroupByPeriodFunction.entityFromKey;
+import static org.apache.metron.profiler.spark.function.GroupByPeriodFunction.periodFromKey;
+import static org.apache.metron.profiler.spark.function.GroupByPeriodFunction.profileFromKey;
 
 /**
  * The function responsible for building profiles in Spark.
@@ -70,7 +74,7 @@ public class ProfileBuilderFunction implements MapGroupsFunction<String,
Message
    * @return
    */
   @Override
-  public ProfileMeasurementAdapter call(String group, Iterator<MessageRoute> iterator)
throws Exception {
+  public ProfileMeasurementAdapter call(String group, Iterator<MessageRoute> iterator)
{
     // create the distributor; some settings are unnecessary because it is cleaned-up immediately
after processing the batch
     int maxRoutes = Integer.MAX_VALUE;
     long profileTTLMillis = Long.MAX_VALUE;
@@ -89,15 +93,28 @@ public class ProfileBuilderFunction implements MapGroupsFunction<String,
Message
     }
 
     // flush the profile
+    ProfileMeasurementAdapter result;
     List<ProfileMeasurement> measurements = distributor.flush();
-    if(measurements.size() > 1) {
-      throw new IllegalStateException("No more than 1 profile measurement is expected");
+    if(measurements.size() == 1) {
+      ProfileMeasurement m = measurements.get(0);
+      result = new ProfileMeasurementAdapter(m);
+      LOG.debug("Profile measurement created; profile={}, entity={}, period={}, value={}",
+              m.getProfileName(), m.getEntity(), m.getPeriod().getPeriod(), m.getProfileValue());
+
+    } else if(measurements.size() == 0) {
+      String msg = format("No profile measurement can be calculated. Review the profile for
bugs. profile=%s, entity=%s, period=%s",
+              profileFromKey(group), entityFromKey(group), periodFromKey(group));
+      LOG.error(msg);
+      throw new IllegalStateException(msg);
+
+    } else {
+      String msg = format("Expected 1 profile measurement, but got %d. profile=%s, entity=%s,
period=%s",
+              measurements.size(), profileFromKey(group), entityFromKey(group), periodFromKey(group));
+      LOG.error(msg);
+      throw new IllegalStateException(msg);
     }
 
-    ProfileMeasurement m = measurements.get(0);
-    LOG.debug("Profile measurement created; profile={}, entity={}, period={}, value={}",
-            m.getProfileName(), m.getEntity(), m.getPeriod().getPeriod(), m.getProfileValue());
-    return new ProfileMeasurementAdapter(m);
+    return result;
   }
 
   private static <T> Stream<T> toStream(Iterator<T> iterator) {
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index 83800af..9ea151a 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -20,7 +20,6 @@
 package org.apache.metron.profiler.spark;
 
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.profiler.client.stellar.FixedLookback;
 import org.apache.metron.profiler.client.stellar.GetProfile;
@@ -30,6 +29,7 @@ import org.apache.metron.stellar.common.StellarStatefulExecutor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.SparkSession;
 import org.junit.AfterClass;
@@ -41,13 +41,13 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.metron.common.configuration.profiler.ProfilerConfig.fromJSON;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
@@ -59,10 +59,11 @@ import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INP
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_READER;
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.JSON;
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.ORC;
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.PARQUET;
 import static org.junit.Assert.assertTrue;
 
-import static org.apache.metron.profiler.spark.reader.TelemetryReaders.*;
-
 /**
  * An integration test for the {@link BatchProfiler}.
  */
@@ -166,7 +167,7 @@ public class BatchProfilerIntegrationTest {
     profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
 
     BatchProfiler profiler = new BatchProfiler();
-    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, fromJSON(profileJson));
 
     validateProfiles();
   }
@@ -188,7 +189,7 @@ public class BatchProfilerIntegrationTest {
     profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToORC);
 
     BatchProfiler profiler = new BatchProfiler();
-    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, fromJSON(profileJson));
 
     validateProfiles();
   }
@@ -210,7 +211,7 @@ public class BatchProfilerIntegrationTest {
     profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), inputPath);
 
     BatchProfiler profiler = new BatchProfiler();
-    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, fromJSON(profileJson));
 
     validateProfiles();
   }
@@ -239,7 +240,7 @@ public class BatchProfilerIntegrationTest {
     readerProperties.put("header", "true");
 
     BatchProfiler profiler = new BatchProfiler();
-    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, fromJSON(profileJson));
 
     validateProfiles();
   }
@@ -255,7 +256,7 @@ public class BatchProfilerIntegrationTest {
     profilerProperties.put(TELEMETRY_INPUT_END.getKey(), "2018-07-07T15:51:48Z");
 
     BatchProfiler profiler = new BatchProfiler();
-    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, fromJSON(profileJson));
 
     // the max timestamp in the data is around July 7, 2018
     assign("maxTimestamp", "1530978728982L");
@@ -279,7 +280,7 @@ public class BatchProfilerIntegrationTest {
     profilerProperties.put(TELEMETRY_INPUT_END.getKey(), "");
 
     BatchProfiler profiler = new BatchProfiler();
-    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, fromJSON(profileJson));
 
     // the max timestamp in the data is around July 7, 2018
     assign("maxTimestamp", "1530978728982L");
@@ -293,6 +294,40 @@ public class BatchProfilerIntegrationTest {
   }
 
   /**
+   * {
+   *   "timestampField": "timestamp",
+   *   "profiles": [
+   *      {
+   *        "profile": "count-by-ip",
+   *        "foreach": "ip_src_addr",
+   *        "init": { "count": 0 },
+   *        "update": { "count" : "count + 1" },
+   *        "result": "count"
+   *      },
+   *      {
+   *        "profile": "invalid-profile",
+   *        "foreach": "'total'",
+   *        "init": { "count": 0 },
+   *        "update": { "count": "count + 1" },
+   *        "result": "INVALID_FUNCTION(count)"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private static String invalidProfileJson;
+
+  @Test(expected = SparkException.class)
+  public void testBatchProfilerWithInvalidProfile() throws Exception {
+    profilerProperties.put(TELEMETRY_INPUT_READER.getKey(), JSON.toString());
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
+
+    // the batch profiler should error out, if there is a bug in *any* of the profiles
+    BatchProfiler profiler = new BatchProfiler();
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, fromJSON(invalidProfileJson));
+  }
+
+  /**
    * Validates the profiles that were built.
    *
    * These tests use the Batch Profiler to seed two profiles with archived telemetry.  The
first profile
@@ -316,10 +351,6 @@ public class BatchProfilerIntegrationTest {
     assertTrue(execute("[100] == PROFILE_GET('total-count', 'total', window)", Boolean.class));
   }
 
-  private ProfilerConfig getProfile() throws IOException {
-    return ProfilerConfig.fromJSON(profileJson);
-  }
-
   private Properties getGlobals() {
     return new Properties();
   }
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunctionTest.java
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunctionTest.java
new file mode 100644
index 0000000..78960c4
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunctionTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfilePeriod;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class GroupByPeriodFunctionTest {
+
+  /**
+   * {
+   *    "profile": "my-profile-name",
+   *    "foreach": "'total'",
+   *    "init": { "count": 0 },
+   *    "update": { "count": "count + 1" },
+   *    "result": "count"
+   * }
+   */
+  @Multiline
+  private String profileJSON;
+
+  @Test
+  public void shouldDecodeGroupKey() throws Exception {
+    final ProfileConfig profile = ProfileConfig.fromJSON(profileJSON);
+    final Long timestamp = System.currentTimeMillis();
+    final String entity = "192.168.1.1";
+    final JSONObject message = new JSONObject();
+    final String periodId = new Long(ProfilePeriod.fromTimestamp(timestamp, 15, TimeUnit.MINUTES).getPeriod()).toString();
+
+    MessageRoute route = new MessageRoute(profile, entity, message, timestamp);
+    String groupKey = new GroupByPeriodFunction(new Properties()).call(route);
+
+    // should be able to extract the profile, entity and period from the group key
+    Assert.assertEquals("my-profile-name", GroupByPeriodFunction.profileFromKey(groupKey));
+    Assert.assertEquals(entity, GroupByPeriodFunction.entityFromKey(groupKey));
+    Assert.assertEquals(periodId, GroupByPeriodFunction.periodFromKey(groupKey));
+  }
+
+}
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
index d5a4dba..9e1e0b3 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
@@ -19,6 +19,7 @@
  */
 package org.apache.metron.profiler.spark.function;
 
+import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.profiler.MessageRoute;
 import org.apache.metron.profiler.ProfilePeriod;
@@ -39,13 +40,25 @@ import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATI
 
 public class ProfileBuilderFunctionTest {
 
+  /**
+   * {
+   *    "profile": "total-count",
+   *    "foreach": "'total'",
+   *    "init": { "count": 0 },
+   *    "update": { "count": "count + 1" },
+   *    "result": "count"
+   * }
+   */
+  @Multiline
+  private String profileJSON;
+
   @Test
-  public void testBuildProfile() throws Exception {
+  public void shouldBuildProfileMeasurement() throws Exception {
     // setup the message and profile
     JSONObject message = getMessage();
     String entity = "192.168.1.1";
     long timestamp = (Long) message.get("timestamp");
-    ProfileConfig profile = getProfile();
+    ProfileConfig profile = ProfileConfig.fromJSON(profileJSON);
 
     // setup the route
     MessageRoute route = new MessageRoute(profile, entity, message, timestamp);
@@ -71,6 +84,39 @@ public class ProfileBuilderFunctionTest {
     Assert.assertEquals(expectedPeriod.getPeriod(), (long) measurement.getPeriodId());
   }
 
+  /**
+   * {
+   *    "profile": "total-count",
+   *    "foreach": "'total'",
+   *    "init": { "count": 0 },
+   *    "update": { "count": "count + 1" },
+   *    "result": "INVALID_FUNCTION(count)"
+   * }
+   */
+  @Multiline
+  private static String invalidProfileJson;
+
+  @Test(expected = IllegalStateException.class)
+  public void shouldThrowExceptionIfInvalidProfile() throws Exception {
+    // setup the message and profile
+    JSONObject message = getMessage();
+    String entity = "192.168.1.1";
+    long timestamp = (Long) message.get("timestamp");
+    ProfileConfig profile = ProfileConfig.fromJSON(invalidProfileJson);
+
+    // setup the route
+    MessageRoute route = new MessageRoute(profile, entity, message, timestamp);
+    List<MessageRoute> routes = new ArrayList();
+    routes.add(route);
+    routes.add(route);
+    routes.add(route);
+    Properties profilerProperties = getProfilerProperties();
+
+    // an exception should be thrown, if there is a bug in the profile definition
+    ProfileBuilderFunction function = new ProfileBuilderFunction(profilerProperties, getGlobals());
+    ProfileMeasurementAdapter measurement = function.call("profile1-192.168.1.1-0", routes.iterator());
+  }
+
   private JSONObject getMessage() {
     JSONObject message = new JSONObject();
     message.put("ip_src_addr", "192.168.1.1");
@@ -86,13 +132,4 @@ public class ProfileBuilderFunctionTest {
   private Map<String, String> getGlobals() {
     return Collections.emptyMap();
   }
-
-  private ProfileConfig getProfile() {
-    return new ProfileConfig()
-            .withProfile("profile1")
-            .withForeach("ip_src_addr")
-            .withUpdate("count", "count + 1")
-            .withResult("count");
-
-  }
 }


Mime
View raw message