beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: [BEAM-2787] Fix MongoDbIO exception when trying to write empty bundle
Date Fri, 08 Sep 2017 12:29:34 GMT
Repository: beam
Updated Branches:
  refs/heads/master 80c86f81b -> 27a596131


[BEAM-2787] Fix MongoDbIO exception when trying to write empty bundle


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fd274bac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fd274bac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fd274bac

Branch: refs/heads/master
Commit: fd274bac2386d4af79224c60ee214f3615dd7434
Parents: 80c86f8
Author: Ismaël Mejía <iemejia@gmail.com>
Authored: Fri Sep 8 11:59:54 2017 +0200
Committer: Ismaël Mejía <iemejia@gmail.com>
Committed: Fri Sep 8 12:47:25 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java   |  7 ++++---
 .../org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java    | 11 +++++++++++
 2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fd274bac/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 087123a..d29f0ae 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -646,7 +646,7 @@ public class MongoDbIO {
       builder.add(DisplayData.item("batchSize", batchSize()));
     }
 
-    private static class WriteFn extends DoFn<Document, Void> {
+    static class WriteFn extends DoFn<Document, Void> {
       private final Write spec;
       private transient MongoClient client;
       private List<Document> batch;
@@ -684,11 +684,12 @@ public class MongoDbIO {
       }
 
       private void flush() {
+        if (batch.isEmpty()) {
+          return;
+        }
         MongoDatabase mongoDatabase = client.getDatabase(spec.database());
         MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection());
-
         mongoCollection.insertMany(batch);
-
         batch.clear();
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fd274bac/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 67dbca4..a3fe063 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -270,4 +271,14 @@ public class MongoDbIOTest implements Serializable {
 
   }
 
+  @Test
+  public void testWriteEmptyCollection() throws Exception {
+    MongoDbIO.Write write =
+        MongoDbIO.write()
+            .withUri("mongodb://localhost:" + port)
+            .withDatabase("test")
+            .withCollection("empty");
+    DoFnTester<Document, Void> fnTester = DoFnTester.of(new MongoDbIO.Write.WriteFn(write));
+    fnTester.processBundle(new ArrayList<Document>());
+  }
 }


Mime
View raw message