flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject flume git commit: FLUME-2649. Elasticsearch sink doesn't handle JSON fields correctly
Date Tue, 21 Apr 2015 07:00:12 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.7 a8c0f90af -> d2fc881f5


FLUME-2649. Elasticsearch sink doesn't handle JSON fields correctly

(Benjamin Fiorini via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d2fc881f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d2fc881f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d2fc881f

Branch: refs/heads/flume-1.7
Commit: d2fc881f549568ea640fa29a96b0b37da64b225d
Parents: a8c0f90
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Mon Apr 20 23:59:18 2015 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Mon Apr 20 23:59:59 2015 -0700

----------------------------------------------------------------------
 .../sink/elasticsearch/ContentBuilderUtil.java  | 25 +++++++++-----
 .../AbstractElasticSearchSinkTest.java          |  3 +-
 .../elasticsearch/TestElasticSearchSink.java    | 34 ++++++++++++++++++++
 3 files changed, 53 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d2fc881f/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
index de0acf4..83c3ffd 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
@@ -55,20 +55,29 @@ public class ContentBuilderUtil {
 
   public static void addComplexField(XContentBuilder builder, String fieldName,
       XContentType contentType, byte[] data) throws IOException {
-    XContentParser parser =
-      XContentFactory.xContent(contentType).createParser(data);
-    parser.nextToken();
-    // Add the field name, but not the value.
-    builder.field(fieldName);
+    XContentParser parser = null;
     try {
+      // Elasticsearch will accept JSON directly but we need to validate that
+      // the incoming event is JSON first. Sadly, the elasticsearch JSON parser
+      // is a stream parser so we need to instantiate it, parse the event to
+      // validate it, then instantiate it again to provide the JSON to
+      // elasticsearch.
+      // If validation fails then the incoming event is submitted to
+      // elasticsearch as plain text.
+      parser = XContentFactory.xContent(contentType).createParser(data);
+      while (parser.nextToken() != null) {};
+
+      // If the JSON is valid then include it
+      parser = XContentFactory.xContent(contentType).createParser(data);
+      // Add the field name, but not the value.
+      builder.field(fieldName);
       // This will add the whole parsed content as the value of the field.
       builder.copyCurrentStructure(parser);
     } catch (JsonParseException ex) {
       // If we get an exception here the most likely cause is nested JSON that
       // can't be figured out in the body. At this point just push it through
-      // as is, we have already added the field so don't do it again
-      builder.endObject();
-      builder.field(fieldName, new String(data, charset));
+      // as is
+      addSimpleField(builder, fieldName, data);
     } finally {
       if (parser != null) {
         parser.close();

http://git-wip-us.apache.org/repos/asf/flume/blob/d2fc881f/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
index 2f8fd6d..f9272fa 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
@@ -127,7 +127,8 @@ public abstract class AbstractElasticSearchSinkTest {
   void assertBodyQuery(int expectedHits, Event... events) {
     // Perform Multi Field Match
     assertSearch(expectedHits,
-        performSearch(QueryBuilders.fieldQuery("@message", "event")), null);
+        performSearch(QueryBuilders.fieldQuery("@message", "event")),
+        null, events);
   }
 
   SearchResponse performSearch(QueryBuilder query) {

http://git-wip-us.apache.org/repos/asf/flume/blob/d2fc881f/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
index 78e1665..a58f344 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
@@ -95,6 +95,40 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest
{
   }
 
   @Test
+  public void shouldIndexInvalidComplexJsonBody() throws Exception {
+    parameters.put(BATCH_SIZE, "3");
+    Configurables.configure(fixture, new Context(parameters));
+    Channel channel = bindAndStartChannel(fixture);
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event event1 = EventBuilder.withBody("TEST1 {test}".getBytes());
+    channel.put(event1);
+    Event event2 = EventBuilder.withBody("{test: TEST2 }".getBytes());
+    channel.put(event2);
+    Event event3 = EventBuilder.withBody("{\"test\":{ TEST3 {test} }}".getBytes());
+    channel.put(event3);
+    tx.commit();
+    tx.close();
+
+    fixture.process();
+    fixture.stop();
+    client.admin().indices()
+        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
+
+    assertMatchAllQuery(3);
+    assertSearch(1,
+        performSearch(QueryBuilders.fieldQuery("@message", "TEST1")),
+        null, event1);
+    assertSearch(1,
+        performSearch(QueryBuilders.fieldQuery("@message", "TEST2")),
+        null, event2);
+    assertSearch(1,
+        performSearch(QueryBuilders.fieldQuery("@message", "TEST3")),
+        null, event3);
+  }
+
+  @Test
   public void shouldIndexComplexJsonEvent() throws Exception {
     Configurables.configure(fixture, new Context(parameters));
     Channel channel = bindAndStartChannel(fixture);


Mime
View raw message