nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattyb...@apache.org
Subject nifi git commit: NIFI-4269 Added the ability to serialize Mongo documents to a clean JSON view instead of just extended JSON.
Date Wed, 06 Sep 2017 19:56:53 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 9ac88d210 -> 527ce0b4e


NIFI-4269 Added the ability to serialize Mongo documents to a clean JSON view instead of just
extended JSON.

NIFI-4269 incorporated changes from the code review.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2063


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/527ce0b4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/527ce0b4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/527ce0b4

Branch: refs/heads/master
Commit: 527ce0b4efe7e23f95f89f57ace7944aafee8df0
Parents: 9ac88d2
Author: Mike Thomsen <mikerthomsen@gmail.com>
Authored: Mon Aug 7 10:59:45 2017 -0400
Committer: Matthew Burgess <mattyb149@apache.org>
Committed: Wed Sep 6 15:56:12 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/mongodb/GetMongo.java       | 86 +++++++++++++++-----
 .../processors/mongodb/ObjectIdSerializer.java  | 45 ++++++++++
 .../nifi/processors/mongodb/GetMongoTest.java   | 39 +++++++--
 3 files changed, 146 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/527ce0b4/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index 79f50b2..185d012 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -18,15 +18,6 @@
  */
 package org.apache.nifi.processors.mongodb;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
@@ -35,6 +26,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -49,8 +41,19 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.bson.Document;
+import org.bson.json.JsonWriterSettings;
 import org.codehaus.jackson.map.ObjectMapper;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 
 @Tags({ "mongodb", "read", "get" })
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -120,12 +123,31 @@ public class GetMongo extends AbstractMongoProcessor {
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
         .build();
 
+    static final String JSON_TYPE_EXTENDED = "Extended";
+    static final String JSON_TYPE_STANDARD   = "Standard";
+    static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended
JSON",
+            "Use MongoDB's \"extended JSON\". This is the JSON generated with toJson() on
a MongoDB Document from the Java driver");
+    static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard
JSON",
+            "Generate a JSON document that conforms to typical JSON conventions instead of
Mongo-specific conventions.");
+    static final PropertyDescriptor JSON_TYPE = new PropertyDescriptor.Builder()
+            .allowableValues(JSON_EXTENDED, JSON_STANDARD)
+            .defaultValue(JSON_TYPE_EXTENDED)
+            .displayName("JSON Type")
+            .name("json-type")
+            .description("By default, MongoDB's Java driver returns \"extended JSON\". Some
of the features of this variant of JSON" +
+            " may cause problems for other JSON parsers that expect only standard JSON types
and conventions. This configuration setting " +
+            " controls whether to use extended JSON or provide a clean view that conforms
to standard JSON.")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .build();
+
     private final static Set<Relationship> relationships;
     private final static List<PropertyDescriptor> propertyDescriptors;
 
     static {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(JSON_TYPE);
         _propertyDescriptors.add(QUERY);
         _propertyDescriptors.add(PROJECTION);
         _propertyDescriptors.add(SORT);
@@ -151,17 +173,34 @@ public class GetMongo extends AbstractMongoProcessor {
         return propertyDescriptors;
     }
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper;
 
     //Turn a list of Mongo result documents into a String representation of a JSON array
-    private String buildBatch(List<Document> documents) throws IOException {
-        List<Map> docs = new ArrayList<>();
-        for (Document document : documents) {
-            String asJson = document.toJson();
-            docs.add(mapper.readValue(asJson, Map.class));
+    private String buildBatch(List<Document> documents, String jsonTypeSetting) throws
IOException {
+        StringBuilder builder = new StringBuilder();
+        for (int index = 0; index < documents.size(); index++) {
+            Document document = documents.get(index);
+            String asJson;
+            if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
+                asJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(document);
+            } else {
+                asJson = document.toJson(new JsonWriterSettings(true));
+            }
+            builder
+                .append(asJson)
+                .append( (documents.size() > 1 && index + 1 < documents.size())
? ", " : "" );
         }
 
-        return mapper.writeValueAsString(docs);
+        return "[" + builder.toString() + "]";
+    }
+
+    private void configureMapper(String setting) {
+        mapper = new ObjectMapper();
+        if (setting.equals(JSON_TYPE_STANDARD)) {
+            mapper.registerModule(ObjectIdSerializer.getModule());
+            DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+            mapper.setDateFormat(df);
+        }
     }
 
     private void writeBatch(String payload, ProcessContext context, ProcessSession session)
{
@@ -187,6 +226,9 @@ public class GetMongo extends AbstractMongoProcessor {
                 ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions().getValue())
: null;
         final Document sort = context.getProperty(SORT).isSet()
                 ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue())
: null;
+        final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
+        configureMapper(jsonTypeSetting);
+
 
         final MongoCollection<Document> collection = getCollection(context);
 
@@ -220,7 +262,7 @@ public class GetMongo extends AbstractMongoProcessor {
                                 if (log.isDebugEnabled()) {
                                     log.debug("Writing batch...");
                                 }
-                                String payload = buildBatch(batch);
+                                String payload = buildBatch(batch, jsonTypeSetting);
                                 writeBatch(payload, context, session);
                                 batch = new ArrayList<>();
                             } catch (IOException ex) {
@@ -230,7 +272,7 @@ public class GetMongo extends AbstractMongoProcessor {
                     }
                     if (batch.size() > 0) {
                         try {
-                            writeBatch(buildBatch(batch), context, session);
+                            writeBatch(buildBatch(batch, jsonTypeSetting), context, session);
                         } catch (IOException ex) {
                             getLogger().error("Error sending remainder of batch", ex);
                         }
@@ -241,7 +283,13 @@ public class GetMongo extends AbstractMongoProcessor {
                         flowFile = session.write(flowFile, new OutputStreamCallback() {
                             @Override
                             public void process(OutputStream out) throws IOException {
-                                IOUtils.write(cursor.next().toJson(), out);
+                                String json;
+                                if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
+                                    json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(cursor.next());
+                                } else {
+                                    json = cursor.next().toJson();
+                                }
+                                IOUtils.write(json, out);
                             }
                         });
                         flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(),
"application/json");

http://git-wip-us.apache.org/repos/asf/nifi/blob/527ce0b4/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/ObjectIdSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/ObjectIdSerializer.java
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/ObjectIdSerializer.java
new file mode 100644
index 0000000..3f207e3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/ObjectIdSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb;
+
+import org.bson.types.ObjectId;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.Version;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.SerializerProvider;
+import org.codehaus.jackson.map.module.SimpleModule;
+
+import java.io.IOException;
+
+public class ObjectIdSerializer extends JsonSerializer<ObjectId> {
+    @Override
+    public void serialize(ObjectId objectId, JsonGenerator jsonGenerator, SerializerProvider
serializerProvider) throws IOException, JsonProcessingException {
+        jsonGenerator.writeString(objectId.toString());
+    }
+
+    public static SimpleModule getModule() {
+        SimpleModule module = new SimpleModule("ObjectID Serializer", Version.unknownVersion());
+        ObjectIdSerializer serializer = new ObjectIdSerializer();
+        module.addSerializer(ObjectId.class, serializer);
+
+        return module;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/527ce0b4/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
index 455d705..3a045a8 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
@@ -18,10 +18,13 @@
  */
 package org.apache.nifi.processors.mongodb;
 
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -31,6 +34,7 @@ import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.bson.Document;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -48,11 +52,17 @@ public class GetMongoTest {
     private static final String DB_NAME = GetMongoTest.class.getSimpleName().toLowerCase();
     private static final String COLLECTION_NAME = "test";
 
-    private static final List<Document> DOCUMENTS = Lists.newArrayList(
-        new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3),
-        new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4),
-        new Document("_id", "doc_3").append("a", 1).append("b", 3)
+    private static final List<Document> DOCUMENTS;
+    private static final Calendar CAL;
+
+    static {
+        CAL = Calendar.getInstance();
+        DOCUMENTS = Lists.newArrayList(
+            new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3),
+            new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4).append("date_field",
CAL.getTime()),
+            new Document("_id", "doc_3").append("a", 1).append("b", 3)
         );
+    }
 
     private TestRunner runner;
     private MongoClient mongoClient;
@@ -82,6 +92,7 @@ public class GetMongoTest {
 
     @Test
     public void testValidators() {
+
         TestRunner runner = TestRunners.newTestRunner(GetMongo.class);
         Collection<ValidationResult> results;
         ProcessContext pc;
@@ -120,7 +131,7 @@ public class GetMongoTest {
             results = ((MockProcessContext) pc).validate();
         }
         Assert.assertEquals(1, results.size());
-        Assert.assertTrue(results.iterator().next().toString().matches("'Query' .* is invalid
because org.bson.json.JsonParseException"));
+        Assert.assertTrue(results.iterator().next().toString().contains("is invalid because"));
 
         // invalid projection
         runner.setVariable("projection", "{a: x,y,z}");
@@ -149,6 +160,24 @@ public class GetMongoTest {
     }
 
     @Test
+    public void testCleanJson() throws Exception {
+        runner.setVariable("query", "{\"_id\": \"doc_2\"}");
+        runner.setProperty(GetMongo.QUERY, "${query}");
+        runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
+        byte[] raw = runner.getContentAsByteArray(flowFiles.get(0));
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, Object> parsed = mapper.readValue(raw, Map.class);
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+
+        Assert.assertTrue(parsed.get("date_field").getClass() == String.class);
+        Assert.assertTrue(((String)parsed.get("date_field")).startsWith(format.format(CAL.getTime())));
+    }
+
+    @Test
     public void testReadOneDocument() throws Exception {
         runner.setVariable("query", "{a: 1, b: 3}");
         runner.setProperty(GetMongo.QUERY, "${query}");


Mime
View raw message