metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject incubator-metron git commit: METRON-367 Enhance Profiler to Support Multiple Numeric Types (nickwallen) closes apache/incubator-metron#212
Date Tue, 23 Aug 2016 01:10:05 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master 443e7ced3 -> 3acb21653


METRON-367 Enhance Profiler to Support Multiple Numeric Types (nickwallen) closes apache/incubator-metron#212


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/3acb2165
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/3acb2165
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/3acb2165

Branch: refs/heads/master
Commit: 3acb216539f8d757e52eae021838d9e56e379cc4
Parents: 443e7ce
Author: nickwallen <nick@nickallen.org>
Authored: Mon Aug 22 21:09:28 2016 -0400
Committer: Nick Allen <nick@nickallen.org>
Committed: Mon Aug 22 21:09:28 2016 -0400

----------------------------------------------------------------------
 metron-analytics/metron-profiler/README.md      |  14 +-
 .../metron/profiler/ProfileMeasurement.java     |  18 +-
 .../profiler/bolt/ProfileBuilderBolt.java       |   6 +-
 .../profiler/bolt/ProfileHBaseMapper.java       |  34 +++-
 .../zookeeper/readme-example-3/profiler.json    |   2 +-
 .../zookeeper/write-integer/profiler.json       |  13 ++
 .../profiler/bolt/ProfileBuilderBoltTest.java   |   4 +-
 .../integration/ProfilerIntegrationTest.java    | 170 +++++++++++++------
 .../util/DefaultStellarExecutorTest.java        |   1 -
 9 files changed, 184 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3acb2165/metron-analytics/metron-profiler/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/README.md b/metron-analytics/metron-profiler/README.md
index 2ad9c6f..f1e73f8 100644
--- a/metron-analytics/metron-profiler/README.md
+++ b/metron-analytics/metron-profiler/README.md
@@ -21,7 +21,7 @@ The Profiler configuration requires a JSON-formatted set of elements, many
of wh
 | update  	| Required  	| A set of expressions that is executed when a message is applied
to the profile.  A map is expected where the key is the variable name and the value is a Stellar
expression.  The map can include 0 or more variables/expressions.  	    |
 | result  	| Required  	| A Stellar expression that is executed when the window period expires.
 The expression is expected to in some way summarize the messages that were applied to the
profile over the window period.  The expression must result in a numeric value such as a Double,
Long, Float, Short, or Integer.  	    |
 
-#### Examples
+### Examples
 
 Examples of the types of profiles that can be built include the following.  Each shows the
configuration that would be required to produce the profile.  These examples assume a fictitious
input messages that looks something like the following.
 
@@ -46,6 +46,7 @@ Examples of the types of profiles that can be built include the following.
 Each
 }
 ```
 
+
 #### Example 1
 
 The total number of bytes of HTTP data for each host. The following configuration would be
used to generate this profile.
@@ -114,7 +115,7 @@ This creates a profile...
 
 #### Example 3
 
-The average response body length of HTTP traffic. The following configuration would be used
to generate this profile.
+The average of the `length` field of HTTP traffic. The following configuration would be used
to generate this profile.
 
 ```
 {
@@ -129,7 +130,7 @@ The average response body length of HTTP traffic. The following configuration
wo
         "cnt": 0.0
       },
       "update": {
-        "sum": "sum + resp_body_len",
+        "sum": "sum + length",
         "cnt": "cnt + 1"
       },
       "result": "sum / cnt"
@@ -142,7 +143,7 @@ This creates a profile...
  * Named ‘example3’
  * That for each IP source address
  * Only if the 'protocol' field equals 'HTTP'
- * Accumulates the sum of response body length
+ * Accumulates the sum of length
  * Accumulates the number of messages
  * Calculates the average as the result
 
@@ -161,7 +162,6 @@ The Profiler topology also accepts the following configuration settings.
 | profiler.hbase.batch | The number of puts that are written in a single batch.  |
 | profiler.hbase.flush.interval.seconds | The maximum number of seconds between batch writes
to HBase. |
 
-
 ## Getting Started
 
 This section will describe the steps required to get your first profile running.
@@ -228,10 +228,10 @@ This section will describe the steps required to get your first profile
running.
 
 ## Implementation
 
-The Profiler is implemented as a Storm topology using the following bolts and spouts.
-
 ## Topology
 
+The Profiler is implemented as a Storm topology using the following bolts and spouts.
+
 ### KafkaSpout
 
 A spout that consumes messages from a single Kafka topic.  In most cases, the Profiler topology
will consume messages from the `indexing` topic.  This topic contains fully enriched messages
that are ready to be indexed.  This ensures that profiles can take advantage of all the available
data elements.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3acb2165/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index e240567..6db76fe 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -51,7 +51,7 @@ public class ProfileMeasurement {
   /**
    * The actual measurement itself.
    */
-  private double value;
+  private Object value;
 
   public String getProfileName() {
     return profileName;
@@ -85,11 +85,11 @@ public class ProfileMeasurement {
     this.end = end;
   }
 
-  public double getValue() {
+  public Object getValue() {
     return value;
   }
 
-  public void setValue(double value) {
+  public void setValue(Object value) {
     this.value = value;
   }
 
@@ -102,21 +102,19 @@ public class ProfileMeasurement {
 
     if (start != that.start) return false;
     if (end != that.end) return false;
-    if (Double.compare(that.value, value) != 0) return false;
     if (profileName != null ? !profileName.equals(that.profileName) : that.profileName !=
null) return false;
-    return entity != null ? entity.equals(that.entity) : that.entity == null;
+    if (entity != null ? !entity.equals(that.entity) : that.entity != null) return false;
+    return value != null ? value.equals(that.value) : that.value == null;
+
   }
 
   @Override
   public int hashCode() {
-    int result;
-    long temp;
-    result = profileName != null ? profileName.hashCode() : 0;
+    int result = profileName != null ? profileName.hashCode() : 0;
     result = 31 * result + (entity != null ? entity.hashCode() : 0);
     result = 31 * result + (int) (start ^ (start >>> 32));
     result = 31 * result + (int) (end ^ (end >>> 32));
-    temp = Double.doubleToLongBits(value);
-    result = 31 * result + (int) (temp ^ (temp >>> 32));
+    result = 31 * result + (value != null ? value.hashCode() : 0);
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3acb2165/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index d6656ac..fa25149 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -38,6 +38,7 @@ import org.json.simple.parser.JSONParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Map;
 
 import static java.lang.String.format;
@@ -220,10 +221,11 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
             measurement.getProfileName(), measurement.getEntity(), measurement.getStart()));
 
     // execute the 'result' expression
-    Double result;
+    Object result;
     try {
       String resultExpr = profileConfig.getResult();
-       result = executor.execute(resultExpr, new JSONObject(), Double.class);
+      result = executor.execute(resultExpr, new JSONObject(), Object.class);
+
     } catch(ParseException e) {
       throw new ParseException("Bad 'result' expression", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3acb2165/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
index 1c515be..492437e 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
@@ -40,7 +40,7 @@ public class ProfileHBaseMapper implements HBaseMapper {
    * to generate the salt.  Ideally, this constant should be roughly equal to the number
of
    * nodes in the Hbase cluster.
    */
-  private int saltDivisor = 10;
+  private int saltDivisor;
 
   /**
    * The name of the column family.
@@ -49,6 +49,7 @@ public class ProfileHBaseMapper implements HBaseMapper {
 
   public ProfileHBaseMapper() {
     setColumnFamily("P");
+    setSaltDivisor(1000);
   }
 
   /**
@@ -88,12 +89,41 @@ public class ProfileHBaseMapper implements HBaseMapper {
     cols.addColumn(cfBytes, QENTITY, Bytes.toBytes(measurement.getEntity()));
     cols.addColumn(cfBytes, QSTART, Bytes.toBytes(measurement.getStart()));
     cols.addColumn(cfBytes, QEND, Bytes.toBytes(measurement.getEnd()));
-    cols.addColumn(cfBytes, QVALUE, Bytes.toBytes(measurement.getValue()));
+    cols.addColumn(cfBytes, QVALUE, toBytes(measurement.getValue()));
 
     return cols;
   }
 
   /**
+   * Serialize a profile measurement's value.
+   *
+   * The value produced by a Profile definition can be any numeric data type.  The data
+   * type depends on how the profile is defined by the user.  The user should be able to
+   * choose the data type that is most suitable for their use case.
+   *
+   * @param value The value to serialize.
+   */
+  private byte[] toBytes(Object value) {
+    byte[] result;
+
+    if(value instanceof Integer) {
+      result = Bytes.toBytes((Integer) value);
+    } else if(value instanceof Double) {
+      result = Bytes.toBytes((Double) value);
+    } else if(value instanceof Short) {
+      result = Bytes.toBytes((Short) value);
+    } else if(value instanceof Long) {
+      result = Bytes.toBytes((Long) value);
+    } else if(value instanceof Float) {
+      result = Bytes.toBytes((Float) value);
+    } else {
+      throw new RuntimeException("Expected 'Number': actual=" + value);
+    }
+
+    return result;
+  }
+
+  /**
    * Calculates a salt value that is used as part of the row key.
    *
    * The salt is calculated as 'md5(timestamp) % N' where N is a configurable value that
ideally

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3acb2165/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json
b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json
index 96c5935..ce9d0e0 100644
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json
+++ b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json
@@ -10,7 +10,7 @@
         "cnt": 0.0
       },
       "update": {
-        "sum": "sum + resp_body_len",
+        "sum": "sum + length",
         "cnt": "cnt + 1"
       },
       "result": "sum / cnt"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3acb2165/metron-analytics/metron-profiler/src/test/config/zookeeper/write-integer/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/write-integer/profiler.json
b/metron-analytics/metron-profiler/src/test/config/zookeeper/write-integer/profiler.json
new file mode 100644
index 0000000..ae5e6c7
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/config/zookeeper/write-integer/profiler.json
@@ -0,0 +1,13 @@
+{
+  "inputTopic": "indexing",
+  "profiles": [
+    {
+      "profile": "example1",
+      "foreach": "ip_src_addr",
+      "onlyif": "true",
+      "init": {},
+      "update": {},
+      "result": "TO_INTEGER(10.0)"
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3acb2165/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 1528e31..c6745f8 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -64,8 +64,8 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
    *   "foreach": "ip_src_addr",
    *   "onlyif": "true",
    *   "init": {
-   *     "x": 10,
-   *     "y": 20
+   *     "x": "10",
+   *     "y": "20"
    *   },
    *   "update": {
    *     "x": "x + 10",

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3acb2165/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index d0d22b8..64d5bd8 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -23,7 +23,7 @@ package org.apache.metron.profiler.integration;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
@@ -36,12 +36,16 @@ import org.apache.metron.integration.components.KafkaWithZKComponent;
 import org.apache.metron.profiler.bolt.ProfileHBaseMapper;
 import org.apache.metron.test.mock.MockHTable;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -112,6 +116,117 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     }
   }
 
+  /**
+   * Tests the first example contained within the README.
+   */
+  @Test
+  public void testExample1() throws Exception {
+
+    setup(TEST_RESOURCES + "/config/zookeeper/readme-example-1");
+
+    // start the topology and write test messages to kafka
+    fluxComponent.submitTopology();
+    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
+
+    // verify - ensure the profile is being persisted
+    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
+            timeout(seconds(90)));
+
+    // verify - there are 5 'HTTP' each with 390 bytes
+    double actual = readDouble(ProfileHBaseMapper.QVALUE);
+    Assert.assertEquals(390.0 * 5, actual, 0.01);
+  }
+
+  /**
+   * Tests the second example contained within the README.
+   */
+  @Test
+  public void testExample2() throws Exception {
+
+    setup(TEST_RESOURCES + "/config/zookeeper/readme-example-2");
+
+    // start the topology and write test messages to kafka
+    fluxComponent.submitTopology();
+    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
+
+    // verify - ensure the profile is being persisted
+    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
+            timeout(seconds(90)));
+
+    // verify - there are 5 'HTTP' and 5 'DNS' messages thus 5/5 = 1
+    double actual = readDouble(ProfileHBaseMapper.QVALUE);
+    Assert.assertEquals(5.0 / 5.0, actual, 0.01);
+  }
+
+  /**
+   * Tests the third example contained within the README.
+   */
+  @Test
+  public void testExample3() throws Exception {
+
+    setup(TEST_RESOURCES + "/config/zookeeper/readme-example-3");
+
+    // start the topology and write test messages to kafka
+    fluxComponent.submitTopology();
+    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
+
+    // verify - ensure the profile is being persisted
+    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
+            timeout(seconds(90)));
+
+    // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should
be 20
+    double actual = readDouble(ProfileHBaseMapper.QVALUE);
+    Assert.assertEquals(20.0, actual, 0.01);
+  }
+
+  @Test
+  public void testWriteInteger() throws Exception {
+
+    setup(TEST_RESOURCES + "/config/zookeeper/write-integer");
+
+    // start the topology and write test messages to kafka
+    fluxComponent.submitTopology();
+    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
+
+    // verify - ensure the profile is being persisted
+    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
+            timeout(seconds(90)));
+
+    // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should
be 20
+    double actual = readInteger(ProfileHBaseMapper.QVALUE);
+    Assert.assertEquals(10.0, actual, 0.01);
+  }
+
+  /**
+   * Reads a Double value written by the Profiler.
+   * @param columnQual The column qualifier.
+   */
+  private Double readDouble(byte[] columnQual) throws IOException {
+    ResultScanner scanner = profilerTable.getScanner(Bytes.toBytes(columnFamily), columnQual);
+
+    for (Result result : scanner) {
+      byte[] raw = result.getValue(Bytes.toBytes(columnFamily), ProfileHBaseMapper.QVALUE);
+      return Bytes.toDouble(raw);
+    }
+
+    throw new IllegalStateException("No results found");
+  }
+
+  /**
+   * Reads an Integer value written by the Profiler.
+   * @param columnQual The column qualifier.
+   */
+  private Integer readInteger(byte[] columnQual) throws IOException {
+    ResultScanner scanner = profilerTable.getScanner(Bytes.toBytes(columnFamily), columnQual);
+
+    for (Result result : scanner) {
+      byte[] raw = result.getValue(Bytes.toBytes(columnFamily), ProfileHBaseMapper.QVALUE);
+      return Bytes.toInt(raw);
+    }
+
+    throw new IllegalStateException("No results found");
+  }
+
   public void setup(String pathToConfig) throws Exception {
 
     // create input messages for the profiler to consume
@@ -172,55 +287,4 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
       runner.stop();
     }
   }
-
-  /**
-   * Tests the first example contained within the README.
-   */
-  @Test
-  public void testExample1() throws Exception {
-
-    setup(TEST_RESOURCES + "/config/zookeeper/readme-example-1");
-
-    // start the topology and write test messages to kafka
-    fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
-
-    // verify - ensure the profile is being persisted
-    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
-            timeout(seconds(90)));
-  }
-
-  /**
-   * Tests the second example contained within the README.
-   */
-  @Test
-  public void testExample2() throws Exception {
-
-    setup(TEST_RESOURCES + "/config/zookeeper/readme-example-2");
-
-    // start the topology and write test messages to kafka
-    fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
-
-    // verify - ensure the profile is being persisted
-    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
-            timeout(seconds(90)));
-  }
-
-  /**
-   * Tests the third example contained within the README.
-   */
-  @Test
-  public void testExample3() throws Exception {
-
-    setup(TEST_RESOURCES + "/config/zookeeper/readme-example-3");
-
-    // start the topology and write test messages to kafka
-    fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
-
-    // verify - ensure the profile is being persisted
-    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
-            timeout(seconds(90)));
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3acb2165/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
index 68085d5..534c724 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/util/DefaultStellarExecutorTest.java
@@ -157,5 +157,4 @@ public class DefaultStellarExecutorTest {
     executor.execute("2", message, Short.class);
     executor.execute("2", message, Long.class);
   }
-
 }


Mime
View raw message