Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DD0E4200CFE for ; Fri, 8 Sep 2017 18:45:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DBEDB1609C0; Fri, 8 Sep 2017 16:45:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D29301609BE for ; Fri, 8 Sep 2017 18:45:00 +0200 (CEST) Received: (qmail 706 invoked by uid 500); 8 Sep 2017 16:44:59 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 697 invoked by uid 99); 8 Sep 2017 16:44:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Sep 2017 16:44:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F3F08F3280; Fri, 8 Sep 2017 16:44:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: markap14@apache.org To: commits@nifi.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: nifi git commit: NIFI-4081 - Added raw message option in GrokReader This closes #1921. Date: Fri, 8 Sep 2017 16:44:58 +0000 (UTC) archived-at: Fri, 08 Sep 2017 16:45:02 -0000 Repository: nifi Updated Branches: refs/heads/master 1f67cbf62 -> 655960445 NIFI-4081 - Added raw message option in GrokReader This closes #1921. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/65596044 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/65596044 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/65596044 Branch: refs/heads/master Commit: 65596044563bd8217a1fcfcd9bfafa02f2f13160 Parents: 1f67cbf Author: Pierre Villard Authored: Fri Jun 16 00:09:45 2017 +0200 Committer: Mark Payne Committed: Fri Sep 8 12:44:37 2017 -0400 ---------------------------------------------------------------------- .../java/org/apache/nifi/grok/GrokReader.java | 6 ++- .../org/apache/nifi/grok/GrokRecordReader.java | 15 ++++-- .../apache/nifi/grok/TestGrokRecordReader.java | 50 ++++++++++++++++---- 3 files changed, 57 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/65596044/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index 30c7dd3..4a26975 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -61,7 +61,8 @@ import io.thekraken.grok.api.exception.GrokException; + "If a line in the input does not match the expected message pattern, the line of text is either considered to be part of the previous " + "message or is skipped, depending on the configuration, with the exception of stack traces. A stack trace that is found at the end of " + "a log message is considered to be part of the previous message but is added to the 'stackTrace' field of the Record. If a record has " - + "no stack trace, it will have a NULL value for the stackTrace field (assuming that the schema does in fact include a stackTrace field of type String).") + + "no stack trace, it will have a NULL value for the stackTrace field (assuming that the schema does in fact include a stackTrace field of type String). " + + "Assuming that the schema includes a '_raw' field of type String, the raw message will be included in the Record.") public class GrokReader extends SchemaRegistryService implements RecordReaderFactory { private volatile Grok grok; private volatile boolean appendUnmatchedLine; @@ -150,6 +151,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac populateSchemaFieldNames(grok, grokExpression, fields); fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType())); + fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType())); final RecordSchema schema = new SimpleRecordSchema(fields); return schema; @@ -241,4 +243,4 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac final RecordSchema schema = getSchema(variables, in, null); return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, appendUnmatchedLine); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/65596044/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java index 65edf05..e7d81e4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java @@ -50,6 +50,8 @@ public class GrokRecordReader implements RecordReader { private String nextLine; static final String STACK_TRACE_COLUMN_NAME = "stackTrace"; + static final String RAW_MESSAGE_NAME = "_raw"; + private static final Pattern STACK_TRACE_PATTERN = Pattern.compile( "^\\s*(?:(?: |\\t)+at )|" + "(?:(?: |\\t)+\\[CIRCULAR REFERENCE\\:)|" @@ -73,8 +75,11 @@ public class GrokRecordReader implements RecordReader { @Override public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { Map valueMap = null; + StringBuilder raw = new StringBuilder(); + while (valueMap == null || valueMap.isEmpty()) { final String line = nextLine == null ? reader.readLine() : nextLine; + raw.append(line); nextLine = null; // ensure that we don't process nextLine again if (line == null) { return null; @@ -98,9 +103,11 @@ public class GrokRecordReader implements RecordReader { // the stack trace ends. Otherwise, append the next line to the last field in the record. if (isStartOfStackTrace(nextLine)) { stackTrace = readStackTrace(nextLine); + raw.append("\n").append(stackTrace); break; } else if (append) { trailingText.append("\n").append(nextLine); + raw.append("\n").append(nextLine); } } else { // The next line matched our pattern. @@ -108,11 +115,11 @@ public class GrokRecordReader implements RecordReader { } } - final Record record = createRecord(valueMap, trailingText, stackTrace, coerceTypes, dropUnknownFields); + final Record record = createRecord(valueMap, trailingText, stackTrace, raw.toString(), coerceTypes, dropUnknownFields); return record; } - private Record createRecord(final Map valueMap, final StringBuilder trailingText, final String stackTrace, final boolean coerceTypes, final boolean dropUnknown) { + private Record createRecord(final Map valueMap, final StringBuilder trailingText, final String stackTrace, final String raw, final boolean coerceTypes, final boolean dropUnknown) { final Map converted = new HashMap<>(); for (final Map.Entry entry : valueMap.entrySet()) { final String fieldName = entry.getKey(); @@ -179,6 +186,8 @@ public class GrokRecordReader implements RecordReader { } converted.put(STACK_TRACE_COLUMN_NAME, stackTrace); + converted.put(RAW_MESSAGE_NAME, raw); + return new MapRecord(schema, converted); } @@ -257,4 +266,4 @@ public class GrokRecordReader implements RecordReader { return schema; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/65596044/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java index b849c0a..83286dc 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java @@ -52,19 +52,24 @@ public class TestGrokRecordReader { final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", "FATAL", "FINE"}; final String[] messages = new String[] {"Test Message 1", "Red", "Green", "Blue", "Yellow"}; + final String[] rawMessages = new String[] {"2016-11-08 21:24:23,029 INFO Test Message 1", + "2016-11-08 21:24:23,029 WARN Red", "2016-11-08 21:24:23,029 ERROR Green", + "2016-11-08 21:24:23,029 FATAL Blue", "2016-11-08 21:24:23,029 FINE Yellow"}; for (int i = 0; i < logLevels.length; i++) { final Object[] values = deserializer.nextRecord().getValues(); assertNotNull(values); - assertEquals(4, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE + assertEquals(5, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE, RAW_MESSAGE assertEquals("2016-11-08 21:24:23,029", values[0]); assertEquals(logLevels[i], values[1]); assertEquals(messages[i], values[2]); assertNull(values[3]); + assertEquals(rawMessages[i], values[4]); } assertNull(deserializer.nextRecord()); + deserializer.close(); } } @@ -83,13 +88,16 @@ public class TestGrokRecordReader { final Object[] values = deserializer.nextRecord().getValues(); assertNotNull(values); - assertEquals(6, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE + assertEquals(7, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE, RAW_MESSAGE assertEquals("2016-08-04 13:26:32,473", values[0]); assertEquals("INFO", values[1]); assertEquals("Leader Election Notification Thread-1", values[2]); assertEquals("o.a.n.LoggerClass", values[3]); assertEquals("", values[4]); assertEquals("org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces", values[5]); + assertEquals(msg, values[6]); + + deserializer.close(); } @@ -109,12 +117,14 @@ public class TestGrokRecordReader { final Object[] values = deserializer.nextRecord().getValues(); assertNotNull(values); - assertEquals(6, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE + assertEquals(7, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE assertEquals(logLevels[i], values[1]); assertNull(values[5]); + assertNotNull(values[6]); } assertNull(deserializer.nextRecord()); + deserializer.close(); } } @@ -134,18 +144,23 @@ public class TestGrokRecordReader { final Object[] values = record.getValues(); assertNotNull(values); - assertEquals(6, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE + assertEquals(7, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE assertEquals(logLevels[i], values[1]); if ("ERROR".equals(values[1])) { final String msg = (String) values[4]; assertEquals("One\nTwo\nThree", msg); assertNotNull(values[5]); + assertTrue(values[6].toString().startsWith("2016-08-04 13:26:32,474 ERROR [Leader Election Notification Thread-2] o.apache.nifi.controller.FlowController One")); + assertTrue(values[6].toString().endsWith(" at org.apache.nifi.cluster." + + "coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) " + + "[nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]\n ... 12 common frames omitted")); } else { assertNull(values[5]); } } assertNull(deserializer.nextRecord()); + deserializer.close(); } } @@ -168,9 +183,10 @@ public class TestGrokRecordReader { final Object[] values = deserializer.nextRecord().getValues(); assertNotNull(values); - assertEquals(4, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE + assertEquals(5, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE, RAW_MESSAGE assertEquals(logLevels[i], values[1]); assertEquals(messages[i], values[2]); + assertNotNull(values[4]); if (values[1].equals("ERROR")) { final String stackTrace = (String) values[3]; @@ -182,10 +198,21 @@ public class TestGrokRecordReader { assertTrue(stackTrace.contains("at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(" + "NodeClusterCoordinator.java:185) [nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]")); assertTrue(stackTrace.endsWith(" ... 12 common frames omitted")); + + final String raw = (String) values[4]; + assertTrue(raw.startsWith("2016-11-23 16:00:02,689 ERROR Log message with stack trace")); + assertTrue(raw.contains("org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces")); + assertTrue(raw.contains(" at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(" + + "NodeClusterCoordinator.java:185) [nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]")); + assertTrue(raw.contains("Caused by: org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces")); + assertTrue(raw.contains("at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(" + + "NodeClusterCoordinator.java:185) [nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]")); + assertTrue(raw.endsWith(" ... 12 common frames omitted")); } } assertNull(deserializer.nextRecord()); + deserializer.close(); } } @@ -201,7 +228,7 @@ public class TestGrokRecordReader { final RecordSchema schema = GrokReader.createRecordSchema(grok); final List fieldNames = schema.getFieldNames(); - assertEquals(8, fieldNames.size()); + assertEquals(9, fieldNames.size()); assertTrue(fieldNames.contains("timestamp")); assertTrue(fieldNames.contains("logsource")); assertTrue(fieldNames.contains("facility")); @@ -210,6 +237,7 @@ public class TestGrokRecordReader { assertTrue(fieldNames.contains("pid")); assertTrue(fieldNames.contains("message")); assertTrue(fieldNames.contains("stackTrace")); // always implicitly there + assertTrue(fieldNames.contains("_raw")); // always implicitly there final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, schema, true); final Record record = deserializer.nextRecord(); @@ -221,8 +249,10 @@ public class TestGrokRecordReader { assertEquals("nifi", record.getValue("program")); assertEquals("12345", record.getValue("pid")); assertEquals("My Message", record.getValue("message")); + assertEquals("May 22 15:58:23 my-host nifi[12345]:My Message", record.getValue("_raw")); assertNull(deserializer.nextRecord()); + deserializer.close(); } } @@ -241,7 +271,7 @@ public class TestGrokRecordReader { final RecordSchema schema = GrokReader.createRecordSchema(grok); final List fieldNames = schema.getFieldNames(); - assertEquals(6, fieldNames.size()); + assertEquals(7, fieldNames.size()); assertTrue(fieldNames.contains("first")); assertTrue(fieldNames.contains("second")); assertTrue(fieldNames.contains("third")); @@ -258,6 +288,7 @@ public class TestGrokRecordReader { assertEquals("5", record.getValue("fifth")); assertNull(deserializer.nextRecord()); + deserializer.close(); } } @@ -276,7 +307,7 @@ public class TestGrokRecordReader { final RecordSchema schema = GrokReader.createRecordSchema(grok); final List fieldNames = schema.getFieldNames(); - assertEquals(6, fieldNames.size()); + assertEquals(7, fieldNames.size()); assertTrue(fieldNames.contains("first")); assertTrue(fieldNames.contains("second")); assertTrue(fieldNames.contains("third")); @@ -295,6 +326,7 @@ public class TestGrokRecordReader { } assertNull(deserializer.nextRecord()); + deserializer.close(); } } -} +} \ No newline at end of file