metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject [metron] branch master updated: METRON-1970 Add Metadata to Error Messages Generated During Parsing (nickwallen) closes apache/metron#1325
Date Tue, 12 Feb 2019 17:18:39 GMT
This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b6ef88  METRON-1970 Add Metadata to Error Messages Generated During Parsing (nickwallen)
closes apache/metron#1325
1b6ef88 is described below

commit 1b6ef88c79d60022542cda7e9abbea7e720773cc
Author: nickwallen <nick@nickallen.org>
AuthorDate: Tue Feb 12 12:18:09 2019 -0500

    METRON-1970 Add Metadata to Error Messages Generated During Parsing (nickwallen) closes
apache/metron#1325
---
 .../java/org/apache/metron/common/Constants.java   |  1 +
 .../apache/metron/common/error/MetronError.java    | 79 +++++++++++-----------
 .../metron/common/error/MetronErrorTest.java       | 47 +++++++++++++
 metron-platform/metron-parsing/README.md           | 51 ++++++++++----
 .../apache/metron/parsers/ParserRunnerImpl.java    |  3 +
 .../metron/parsers/ParserRunnerImplTest.java       | 16 ++++-
 6 files changed, 141 insertions(+), 56 deletions(-)

diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index 5054508..a0b5bce 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -93,6 +93,7 @@ public class Constants {
     ,RAW_MESSAGE_BYTES("raw_message_bytes")
     ,ERROR_FIELDS("error_fields")
     ,ERROR_HASH("error_hash")
+    ,METADATA("metadata")
     ;
 
     private String name;
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
index 0493be6..89044de 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
@@ -25,10 +25,14 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
@@ -45,6 +49,7 @@ public class MetronError {
   private ErrorType errorType = ErrorType.DEFAULT_ERROR;
   private Set<String> errorFields;
   private List<Object> rawMessages;
+  private Map<String, Object> metadata = new HashMap<>();
 
   public MetronError withMessage(String message) {
     this.message = message;
@@ -71,6 +76,10 @@ public class MetronError {
     return this;
   }
 
+  public MetronError withMetadata(Map<String, Object> metadata) {
+    this.metadata.putAll(metadata);
+    return this;
+  }
 
   public MetronError addRawMessage(Object rawMessage) {
     if (rawMessage != null) {
@@ -95,25 +104,28 @@ public class MetronError {
   public JSONObject getJSONObject() {
     JSONObject errorMessage = new JSONObject();
     errorMessage.put(Constants.GUID, UUID.randomUUID().toString());
-    errorMessage.put(Constants.SENSOR_TYPE, "error");
-    if (sensorTypes.size() == 1) {
-      errorMessage.put(ErrorFields.FAILED_SENSOR_TYPE.getName(), sensorTypes.iterator().next());
-    } else {
-      errorMessage
-          .put(ErrorFields.FAILED_SENSOR_TYPE.getName(), new JSONArray().addAll(sensorTypes));
-    }
+    errorMessage.put(Constants.SENSOR_TYPE, Constants.ERROR_TYPE);
     errorMessage.put(ErrorFields.ERROR_TYPE.getName(), errorType.getType());
-
+    addFailedSensorType(errorMessage);
     addMessageString(errorMessage);
 		addStacktrace(errorMessage);
     addTimestamp(errorMessage);
     addHostname(errorMessage);
     addRawMessages(errorMessage);
     addErrorHash(errorMessage);
+    addMetadata(errorMessage);
 
     return errorMessage;
   }
 
+  private void addFailedSensorType(JSONObject errorMessage) {
+    if (sensorTypes.size() == 1) {
+      errorMessage.put(ErrorFields.FAILED_SENSOR_TYPE.getName(), sensorTypes.iterator().next());
+    } else {
+      errorMessage.put(ErrorFields.FAILED_SENSOR_TYPE.getName(), new JSONArray().addAll(sensorTypes));
+    }
+  }
+
   @SuppressWarnings({"unchecked"})
   private void addMessageString(JSONObject errorMessage) {
     if (message != null) {
@@ -192,44 +204,31 @@ public class MetronError {
     }
   }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
+  private void addMetadata(JSONObject errorMessage) {
+    if(metadata != null && metadata.keySet().size() > 0) {
+      // add each metadata element directly to the message. each metadata key already has
+      // a standard prefix, no need to add another prefix to avoid collisions. this mimics
+      // the behavior of merging metadata.
+      errorMessage.putAll(metadata);
     }
+  }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof MetronError)) return false;
     MetronError that = (MetronError) o;
-
-    if (message != null ? !message.equals(that.message) : that.message != null) {
-      return false;
-    }
-    if (getThrowable() != null ? !getThrowable().equals(that.getThrowable())
-        : that.getThrowable() != null) {
-      return false;
-    }
-    if (sensorTypes != null ? !sensorTypes.equals(that.sensorTypes) : that.sensorTypes !=
null) {
-      return false;
-    }
-    if (errorType != that.errorType) {
-      return false;
-    }
-    if (errorFields != null ? !errorFields.equals(that.errorFields) : that.errorFields !=
null) {
-      return false;
-    }
-    return rawMessages != null ? rawMessages.equals(that.rawMessages) : that.rawMessages
== null;
+    return Objects.equals(message, that.message) &&
+            Objects.equals(throwable, that.throwable) &&
+            Objects.equals(sensorTypes, that.sensorTypes) &&
+            errorType == that.errorType &&
+            Objects.equals(errorFields, that.errorFields) &&
+            Objects.equals(rawMessages, that.rawMessages) &&
+            Objects.equals(metadata, that.metadata);
   }
 
   @Override
   public int hashCode() {
-    int result = message != null ? message.hashCode() : 0;
-    result = 31 * result + (getThrowable() != null ? getThrowable().hashCode() : 0);
-    result = 31 * result + (sensorTypes != null ? sensorTypes.hashCode() : 0);
-    result = 31 * result + (errorType != null ? errorType.hashCode() : 0);
-    result = 31 * result + (errorFields != null ? errorFields.hashCode() : 0);
-    result = 31 * result + (rawMessages != null ? rawMessages.hashCode() : 0);
-    return result;
+    return Objects.hash(message, throwable, sensorTypes, errorType, errorFields, rawMessages,
metadata);
   }
 }
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
index 177a232..294d6dc 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
@@ -18,6 +18,7 @@
 package org.apache.metron.common.error;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Sets;
@@ -25,6 +26,9 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.Constants;
 import org.json.simple.JSONObject;
@@ -118,4 +122,47 @@ public class MetronErrorTest {
     assertEquals(Sets.newHashSet("field1", "field2"), Sets.newHashSet(((String) errorJSON.get(Constants.ErrorFields.ERROR_FIELDS.getName())).split(",")));
     assertEquals("04a2629c39e098c3944be85f35c75876598f2b44b8e5e3f52c59fa1ac182817c", errorJSON.get(Constants.ErrorFields.ERROR_HASH.getName()));
   }
+
+  @Test
+  public void shouldIncludeMessageMetadata() {
+    // the metadata that should be included in the error message
+    Map<String, Object> metadata = new HashMap<>();
+    metadata.put("metron.metadata.topic", "bro");
+    metadata.put("metron.metadata.partition", 0);
+    metadata.put("metron.metadata.offset", 123);
+
+    JSONObject message = new JSONObject();
+    message.put("field1", "value1");
+    message.put("field2", "value2");
+
+    MetronError error = new MetronError()
+            .addRawMessage(message)
+            .withMetadata(metadata);
+
+    // expect the metadata to be flattened and folded into the error message
+    JSONObject errorMessage = error.getJSONObject();
+    assertEquals("bro", errorMessage.get("metron.metadata.topic"));
+    assertEquals(0, errorMessage.get("metron.metadata.partition"));
+    assertEquals(123, errorMessage.get("metron.metadata.offset"));
+  }
+
+  @Test
+  public void shouldNotIncludeEmptyMetadata() {
+    // there is no metadata
+    Map<String, Object> metadata = new HashMap<>();
+
+    JSONObject message = new JSONObject();
+    message.put("field1", "value1");
+    message.put("field2", "value2");
+
+    MetronError error = new MetronError()
+            .addRawMessage(message)
+            .withMetadata(metadata);
+
+    // expect the metadata to be flattened and folded into the error message
+    JSONObject errorMessage = error.getJSONObject();
+    assertFalse(errorMessage.containsKey("metron.metadata.topic"));
+    assertFalse(errorMessage.containsKey("metron.metadata.partition"));
+    assertFalse(errorMessage.containsKey("metron.metadata.offset"));
+  }
 }
diff --git a/metron-platform/metron-parsing/README.md b/metron-platform/metron-parsing/README.md
index 9bbd39f..b8f44cb 100644
--- a/metron-platform/metron-parsing/README.md
+++ b/metron-platform/metron-parsing/README.md
@@ -165,13 +165,13 @@ Errors, which are defined as unexpected exceptions happening during
the
 parse, are sent along to the error queue with a message indicating that
 there was an error in parse along with a stacktrace.  This is to
 distinguish from the invalid messages.
- 
+
 ## Filtered
 
 One can also filter a message by specifying a `filterClassName` in the
 parser config.  Filtered messages are just dropped rather than passed
 through.
-   
+
 ## Parser Architecture
 
 ![Architecture](parser_arch.png)
@@ -180,7 +180,7 @@ Data flows through the parser via kafka and into the `enrichments`
 topology in kafka.  Errors are collected with the context of the error
 (e.g. stacktrace) and original message causing the error and sent to an
 `error` queue.  Invalid messages as determined by global validation
-functions are also treated as errors and sent to an `error` queue. 
+functions are also treated as errors and sent to an `error` queue.
 
 ## Message Format
 
@@ -218,7 +218,7 @@ So putting it all together a typical Metron message with all 5-tuple fields
pres
 }
 ```
 
-## Global Configuration 
+## Global Configuration
 
 There are a few properties which can be managed in the global configuration that have pertinence
to
 parsers and parsing in general.
@@ -261,7 +261,7 @@ The document is structured in the following way
         }
         ```
 
-* `sensorTopic` : The kafka topic to send the parsed messages to.  If the topic is prefixed
and suffixed by `/` 
+* `sensorTopic` : The kafka topic to send the parsed messages to.  If the topic is prefixed
and suffixed by `/`
 then it is assumed to be a regex and will match any topic matching the pattern (e.g. `/bro.*/`
would match `bro_cust0`, `bro_cust1` and `bro_cust2`)
 * `readMetadata` : Boolean indicating whether to read metadata or not (The default is raw
message strategy dependent).  See below for a discussion about metadata.
 * `mergeMetadata` : Boolean indicating whether to merge metadata with the message or not
(The default is raw message strategy dependent).  See below for a discussion about metadata.
@@ -291,7 +291,7 @@ then it is assumed to be a regex and will match any topic matching the
pattern (
         ```
 
 The `fieldTransformations` is a complex object which defines a
-transformation which can be done to a message.  This transformation can 
+transformation which can be done to a message.  This transformation can
 * Modify existing fields to a message
 * Add new fields given the values of existing fields of a message
 * Remove existing fields of a message
@@ -303,7 +303,7 @@ For platform specific configs, see the README of the appropriate project.
This w
 
 Metadata is a useful thing to send to Metron and use during enrichment or threat intelligence.
 
 Consider the following scenarios:
-* You have multiple telemetry sources of the same type that you want to 
+* You have multiple telemetry sources of the same type that you want to
     * ensure downstream analysts can differentiate
     * ensure profiles consider independently as they have different seasonality or some other
fundamental characteristic
 
@@ -311,7 +311,7 @@ As such, there are two types of metadata that we seek to support in Metron:
 * Environmental metadata : Metadata about the system at large
     * Consider the possibility that you have multiple kafka topics being processed by one
parser and you want to tag the messages with the kafka topic
     * At the moment, only the kafka topic is kept as the field name.
-* Custom metadata: Custom metadata from an individual telemetry source that one might want
to use within Metron. 
+* Custom metadata: Custom metadata from an individual telemetry source that one might want
to use within Metron.
 
 Metadata is controlled by the following parser configs:
 * `rawMessageStrategy` : This is a strategy which indicates how to read data and metadata.
 The strategies supported are:
@@ -324,7 +324,7 @@ Metadata is controlled by the following parser configs:
     * `ENVELOPE`
         * `metadataPrefix` defines the key prefix for metadata (default is `metron.metadata`)
         * `messageField` defines the field from the envelope to use as the data.  All other
fields are considered metadata.
-* `readMetadata` : This is a boolean indicating whether metadata will be read and made available
to Field 
+* `readMetadata` : This is a boolean indicating whether metadata will be read and made available
to Field
 transformations (i.e. Stellar field transformations).  The default is
 dependent upon the `rawMessageStrategy`:
     * `DEFAULT` : default to `false`.
@@ -350,7 +350,31 @@ For instance, sending a metadata field called `customer_id` could be
done by sen
 in the kafka key.  This would be exposed as the field `metron.metadata.customer_id` to stellar
field transformations
 as well, if `mergeMetadata` is `true`, available as a field in its own right.
 
+#### Metadata and Error Handling
+
+When a telemetry message fails to parse correctly, a separate error message is produced and
sent to the error topic.  This error message will contain detailed information to reflect
the error that occurred.  
+
+If the telemetry message that failed contains metadata, this metadata is included in the
error message.  For example, here is an error message that contains two metadata fields; `metron.metadata.topic`
and `metron.metadata.customer`.
+
+```
+{
+  "exception": "java.lang.IllegalStateException: Unable to parse Message: \"this is an invalid
synthetic message\" }",
+  "stack": "java.lang.IllegalStateException: Unable to parse Message: \"this is an invalid
synthetic message\" ...\n",
+  "raw_message": "\"this is an invalid synthetic message\" }",
+  "error_hash": "3d498968e8df7f28d05db3037d4ad2a3a0095c22c14d881be45fac3f184dbcc3",
+  "message": "Unable to parse Message: \"this is an invalid synthetic message\" }",
+  "source.type": "error",
+  "failed_sensor_type": "bro",
+  "hostname": "node1",
+  "error_type": "parser_error",
+  "guid": "563d8d2a-1493-4758-be2f-5613bfd2d615",
+  "timestamp": 1548366516634,
+  "metron.metadata.topic": "bro",
+  "metron.metadata.customer": "acme-inc"
+}
+```
 
+By default, error messages are sent to the `indexing` topic.  This will cause the errors
to be indexed in whichever endpoints you have configured, namely Solr, Elasticsearch, and
HDFS.  You may need to update your configuration of these endpoints to accurately reflect
the metadata fields contained in the error message.  For example, you may need to update the
schema definition of your Solr Collection for the metadata fields to be accurately indexed
in the Error collection.
 
 ### `fieldTransformation` configuration
 
@@ -359,9 +383,9 @@ The format of a `fieldTransformation` is as follows:
 * `output` : The outputs to produce from the transformation.  If unspecified, it is assumed
to be the same as inputs.
 * `transformation` : The fully qualified classname of the transformation to be used.  This
is either a class which implements `FieldTransformation` or a member of the `FieldTransformations`
enum.
 * `config` : A String to Object map of transformation specific configuration.
- 
+
 The currently implemented fieldTransformations are:
-* `REMOVE` : This transformation removes the specified input fields.  If you want a conditional
removal, you can pass a Metron Query Language statement to define the conditions under which
you want to remove the fields. 
+* `REMOVE` : This transformation removes the specified input fields.  If you want a conditional
removal, you can pass a Metron Query Language statement to define the conditions under which
you want to remove the fields.
 
     Consider the following simple configuration which will remove `field1`
     unconditionally:
@@ -396,7 +420,7 @@ The currently implemented fieldTransformations are:
     }
     ```
 
-* `SELECT`: This transformation filters the fields in the message to include only the configured
output fields, and drops any not explicitly included. 
+* `SELECT`: This transformation filters the fields in the message to include only the configured
output fields, and drops any not explicitly included.
 
     For example:
 
@@ -459,7 +483,7 @@ and the values for the config map are the associated new field name.
     ```
 
 * `REGEX_SELECT` : This transformation lets users set an output field to one of a set of
possibilities based on matching regexes. This transformation is useful when the number or
conditions are large enough to make a stellar language match statement unwieldy.
- 
+
     The following config will set the field `logical_source_type` to one of the
     following, dependent upon the value of the `pix_type` field:
     * `cisco-6-302` if `pix_type` starts with either `6-302` or `06-302`
@@ -659,4 +683,3 @@ from your parser topology.
 - [JSON Path concept](http://goessner.net/articles/JsonPath/)
 - [Read about JSON Path library Apache Metron uses](https://github.com/json-path/JsonPath)
 - [Try JSON Path expressions online](http://jsonpath.herokuapp.com)
-
diff --git a/metron-platform/metron-parsing/metron-parsers-common/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
b/metron-platform/metron-parsing/metron-parsers-common/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
index df3ed1c..dfad188 100644
--- a/metron-platform/metron-parsing/metron-parsers-common/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
+++ b/metron-platform/metron-parsing/metron-parsers-common/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
@@ -162,6 +162,7 @@ public class ParserRunnerImpl implements ParserRunner<JSONObject>,
Serializable
                 .withErrorType(Constants.ErrorType.PARSER_ERROR)
                 .withThrowable(throwable)
                 .withSensorType(Collections.singleton(sensorType))
+                .withMetadata(rawMessage.getMetadata())
                 .addRawMessage(rawMessage.getMessage())));
 
         // If exceptions are thrown by the MessageParser, wrap them with MetronErrors and
add them to the list of errors
@@ -169,6 +170,7 @@ public class ParserRunnerImpl implements ParserRunner<JSONObject>,
Serializable
                 .withErrorType(Constants.ErrorType.PARSER_ERROR)
                 .withThrowable(entry.getValue())
                 .withSensorType(Collections.singleton(sensorType))
+                .withMetadata(rawMessage.getMetadata())
                 .addRawMessage(entry.getKey())).collect(Collectors.toList()));
       }
     } else {
@@ -264,6 +266,7 @@ public class ParserRunnerImpl implements ParserRunner<JSONObject>,
Serializable
         MetronError error = new MetronError()
                 .withErrorType(Constants.ErrorType.PARSER_INVALID)
                 .withSensorType(Collections.singleton(sensorType))
+                .withMetadata(rawMessage.getMetadata())
                 .addRawMessage(message);
         Set<String> errorFields = failedValidators == null ? null : failedValidators.stream()
                 .flatMap(fieldValidator -> fieldValidator.getInput().stream())
diff --git a/metron-platform/metron-parsing/metron-parsers-common/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
b/metron-platform/metron-parsing/metron-parsers-common/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
index 29a625d..2d04d40 100644
--- a/metron-platform/metron-parsing/metron-parsers-common/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
+++ b/metron-platform/metron-parsing/metron-parsers-common/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
@@ -327,9 +327,14 @@ public class ParserRunnerImplTest {
 
   @Test
   public void shouldReturnMetronErrorOnInvalidMessage() {
+    Map<String, Object> metadata = new HashMap<>();
+    metadata.put("metron.metadata.topic", "bro");
+    metadata.put("metron.metadata.partition", 0);
+    metadata.put("metron.metadata.offset", 123);
+
     JSONObject inputMessage = new JSONObject();
     inputMessage.put("guid", "guid");
-    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>());
+    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), metadata);
 
     JSONObject expectedOutput  = new JSONObject();
     expectedOutput.put("guid", "guid");
@@ -337,6 +342,7 @@ public class ParserRunnerImplTest {
     MetronError expectedMetronError = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_INVALID)
             .withSensorType(Collections.singleton("bro"))
+            .withMetadata(metadata)
             .addRawMessage(inputMessage);
 
     when(stellarFilter.emit(expectedOutput, parserRunner.getStellarContext())).thenReturn(true);
@@ -355,11 +361,16 @@ public class ParserRunnerImplTest {
 
   @Test
   public void shouldReturnMetronErrorOnFailedFieldValidator() {
+    Map<String, Object> metadata = new HashMap<>();
+    metadata.put("metron.metadata.topic", "bro");
+    metadata.put("metron.metadata.partition", 0);
+    metadata.put("metron.metadata.offset", 123);
+
     JSONObject inputMessage = new JSONObject();
     inputMessage.put("guid", "guid");
     inputMessage.put("ip_src_addr", "test");
     inputMessage.put("ip_dst_addr", "test");
-    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>());
+    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), metadata);
 
     JSONObject expectedOutput  = new JSONObject();
     expectedOutput.put("guid", "guid");
@@ -370,6 +381,7 @@ public class ParserRunnerImplTest {
             .withErrorType(Constants.ErrorType.PARSER_INVALID)
             .withSensorType(Collections.singleton("bro"))
             .addRawMessage(inputMessage)
+            .withMetadata(metadata)
             .withErrorFields(new HashSet<>(Arrays.asList("ip_src_addr", "ip_dst_addr")));
 
     when(stellarFilter.emit(expectedOutput, parserRunner.getStellarContext())).thenReturn(true);


Mime
View raw message