metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject incubator-metron git commit: METRON-133 ParserBolt is not failing tuples when parsing fails (merrimanr via dlyle65535) closes apache/incubator-metron#104
Date Wed, 04 May 2016 17:32:42 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master fd2d209ca -> 5bcee4bf2


METRON-133 ParserBolt is not failing tuples when parsing fails (merrimanr via dlyle65535)
closes apache/incubator-metron#104


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/5bcee4bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/5bcee4bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/5bcee4bf

Branch: refs/heads/master
Commit: 5bcee4bf20442d034655788c0a6a634253eeb0e2
Parents: fd2d209
Author: merrimanr <merrimanr@gmail.com>
Authored: Wed May 4 13:31:42 2016 -0400
Committer: David Lyle <dlyle65535@gmail.com>
Committed: Wed May 4 13:31:42 2016 -0400

----------------------------------------------------------------------
 .../org/apache/metron/parsers/GrokParser.java   | 91 ++++++++++++++++++--
 .../apache/metron/parsers/bolt/ParserBolt.java  |  1 +
 2 files changed, 84 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5bcee4bf/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
index 39a88bb..ee6ac37 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
@@ -52,6 +52,7 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable
{
   protected String timestampField;
   protected String dateFormat = "yyyy-MM-dd HH:mm:ss.S z";
   protected TimeZone timeZone = TimeZone.getTimeZone("UTC");
+  protected String patternsCommonDir = "/patterns/common";
 
   public GrokParser(String grokHdfsPath, String patterLabel) {
     this.grokHdfsPath = grokHdfsPath;
@@ -60,21 +61,33 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable
{
 
   public GrokParser withTimestampField(String timestampField) {
     this.timestampField = timestampField;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Grok parser settting timestamp field: " + timestampField);
+    }
     return this;
   }
 
   public GrokParser withTimeFields(String... timeFields) {
     this.timeFields = timeFields;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Grok parser settting time fields: " + timeFields);
+    }
     return this;
   }
 
   public GrokParser withDateFormat(String dateFormat) {
     this.dateFormat = dateFormat;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Grok parser settting date format: " + dateFormat);
+    }
     return this;
   }
 
   public GrokParser withTimeZone(String timeZone) {
     this.timeZone = TimeZone.getTimeZone(timeZone);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Grok parser settting timezone: " + timeZone);
+    }
     return this;
   }
 
@@ -94,17 +107,39 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable
{
   public void init() {
     grok = new Grok();
     try {
-      InputStream commonInputStream = openInputStream("/patterns/common");
-      if(commonInputStream == null) {
-        throw new RuntimeException("Unable to initialize grok parser: Unable to load /patterns/common
from either classpath or HDFS" );
+      InputStream commonInputStream = openInputStream(patternsCommonDir);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Grok parser loading common patterns from: " + patternsCommonDir);
       }
+
+      if (commonInputStream == null) {
+        throw new RuntimeException(
+                "Unable to initialize grok parser: Unable to load " + patternsCommonDir +
" from either classpath or HDFS");
+      }
+
       grok.addPatternFromReader(new InputStreamReader(commonInputStream));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Loading parser-specific patterns from: " + grokHdfsPath);
+      }
+
       InputStream patterInputStream = openInputStream(grokHdfsPath);
-      if(patterInputStream == null) {
-        throw new RuntimeException("Unable to initialize grok parser: Unable to load " +
grokHdfsPath + " from either classpath or HDFS" );
+      if (patterInputStream == null) {
+        throw new RuntimeException("Grok parser unable to initialize grok parser: Unable
to load " + grokHdfsPath
+                + " from either classpath or HDFS");
       }
       grok.addPatternFromReader(new InputStreamReader(patterInputStream));
-      grok.compile("%{" + patternLabel + "}");
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Grok parser set the following grok expression: " + grok.getNamedRegexCollectionById(patternLabel));
+      }
+
+      String grokPattern = "%{" + patternLabel + "}";
+
+      grok.compile(grokPattern);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Compiled grok pattern" + grokPattern);
+      }
+
     } catch (Throwable e) {
       LOG.error(e.getMessage(), e);
       throw new RuntimeException("Grok parser Error: " + e.getMessage(), e);
@@ -114,16 +149,27 @@ public class GrokParser implements MessageParser<JSONObject>,
Serializable {
   @SuppressWarnings("unchecked")
   @Override
   public List<JSONObject> parse(byte[] rawMessage) {
-    if (grok == null) init();
+    if (grok == null) {
+      init();
+    }
     List<JSONObject> messages = new ArrayList<>();
     try {
       String originalMessage = new String(rawMessage, "UTF-8");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Grok perser parsing message: " + originalMessage);
+      }
       Match gm = grok.match(originalMessage);
       gm.captures();
       JSONObject message = new JSONObject();
       message.putAll(gm.toMap());
+
+      if (message.size() == 0)
+        throw new RuntimeException("Grok statement produced a null message. Original message
was: "
+                + originalMessage + " and the parsed message was: " + message + " . Check
the pattern at: "
+                + grokHdfsPath);
+
       message.put("original_string", originalMessage);
-      for(String timeField: timeFields) {
+      for (String timeField : timeFields) {
         String fieldValue = (String) message.get(timeField);
         if (fieldValue != null) {
           message.put(timeField, toEpoch(fieldValue));
@@ -134,6 +180,9 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable
{
       }
       message.remove(patternLabel);
       messages.add(message);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Grok parser parsed message: " + message);
+      }
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -143,24 +192,50 @@ public class GrokParser implements MessageParser<JSONObject>,
Serializable {
 
   @Override
   public boolean validate(JSONObject message) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Grok parser validating message: " + message);
+    }
+
     Object timestampObject = message.get(Constants.Fields.TIMESTAMP.getName());
     if (timestampObject instanceof Long) {
       Long timestamp = (Long) timestampObject;
       if (timestamp > 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Grok parser validated message: " + message);
+        }
         return true;
       }
     }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Grok parser did not validate message: " + message);
+    }
     return false;
   }
 
   private long toEpoch(String datetime) throws ParseException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Grok perser converting timestamp to epoch: " + datetime);
+    }
+
     SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
     sdf.setTimeZone(timeZone);
     Date date = sdf.parse(datetime);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Grok perser converted timestamp to epoch: " + sdf.parse(datetime));
+    }
+
     return date.getTime();
   }
 
   protected long formatTimestamp(Object value) {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Grok perser formatting timestamp" + value);
+    }
+
+
     if (value == null) {
       throw new RuntimeException(patternLabel + " pattern does not include field " + timestampField);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5bcee4bf/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index e29f900..f0f1bd8 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -79,6 +79,7 @@ public class ParserBolt extends ConfiguredBolt {
       collector.ack(tuple);
     } catch (Throwable ex) {
       ErrorUtils.handleError(collector, ex, Constants.ERROR_STREAM);
+      collector.fail(tuple);
     }
   }
 


Mime
View raw message