beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject beam git commit: [BEAM-2256] Add the last previous range filter
Date Thu, 11 May 2017 21:57:02 GMT
Repository: beam
Updated Branches:
  refs/heads/master 171befaee -> e10662df9


[BEAM-2256] Add the last previous range filter


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e10662df
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e10662df
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e10662df

Branch: refs/heads/master
Commit: e10662df9d080162ab1c4f664165c5dcd1962c08
Parents: 171befa
Author: Jean-Baptiste Onofré <jbonofre@apache.org>
Authored: Thu May 11 22:09:50 2017 +0200
Committer: Luke Cwik <lcwik@google.com>
Committed: Thu May 11 14:56:52 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   | 47 ++++++++++++++------
 .../beam/sdk/io/mongodb/MongoDbIOTest.java      | 18 ++++++++
 2 files changed, 52 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e10662df/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 7236a50..620df74 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
 import com.mongodb.BasicDBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
@@ -184,7 +185,11 @@ public class MongoDbIO {
     }
   }
 
-  private static class BoundedMongoDbSource extends BoundedSource<Document> {
+  /**
+   * A MongoDB {@link BoundedSource} reading {@link Document} from  a given instance.
+   */
+  @VisibleForTesting
+  static class BoundedMongoDbSource extends BoundedSource<Document> {
     private Read spec;
 
     private BoundedMongoDbSource(Read spec) {
@@ -294,7 +299,8 @@ public class MongoDbIO {
      * @param additionalFilter A custom (user) additional filter to append to the range filters.
      * @return A list of filters containing the ranges.
      */
-    private static List<String> splitKeysToFilters(List<Document> splitKeys,
String
+    @VisibleForTesting
+    static List<String> splitKeysToFilters(List<Document> splitKeys, String
         additionalFilter) {
       ArrayList<String> filters = new ArrayList<>();
       String lowestBound = null; // lower boundary (previous split in the iteration)
@@ -306,30 +312,45 @@ public class MongoDbIO {
           // the range from the beginning up to this split
           rangeFilter = String.format("{ $and: [ {\"_id\":{$lte:ObjectId(\"%s\")}}",
               splitKey);
+          filters.add(formatFilter(rangeFilter, additionalFilter));
         } else if (i == splitKeys.size() - 1) {
-          // this is the last split in the list, the filter defines
-          // the range from the split up to the end
+          // this is the last split in the list, the filters define
+          // the range from the previous split to the current split and also
+          // the current split to the end
+          rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\"),"
+              + "$lte:ObjectId(\"%s\")}}", lowestBound, splitKey);
+          filters.add(formatFilter(rangeFilter, additionalFilter));
           rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")}}",
               splitKey);
+          filters.add(formatFilter(rangeFilter, additionalFilter));
         } else {
           // we are between two splits
           rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\"),"
               + "$lte:ObjectId(\"%s\")}}", lowestBound, splitKey);
+          filters.add(formatFilter(rangeFilter, additionalFilter));
         }
-        if (additionalFilter != null && !additionalFilter.isEmpty()) {
-          // user provided a filter, we append the user filter to the range filter
-          rangeFilter = String.format("%s,%s ]}", rangeFilter, additionalFilter);
-        } else {
-          // user didn't provide a filter, just cleany close the range filter
-          rangeFilter = String.format("%s ]}", rangeFilter);
-        }
-
-        filters.add(rangeFilter);
 
         lowestBound = splitKey;
       }
       return filters;
     }
+
+    /**
+     * Cleanly format range filter, optionally adding the users filter if specified.
+     *
+     * @param filter The range filter.
+     * @param additionalFilter The users filter. Null if unspecified.
+     * @return The cleanly formatted range filter.
+     */
+    private static String formatFilter(String filter, @Nullable String additionalFilter)
{
+      if (additionalFilter != null && !additionalFilter.isEmpty()) {
+        // user provided a filter, we append the user filter to the range filter
+        return String.format("%s,%s ]}", filter, additionalFilter);
+      } else {
+        // user didn't provide a filter, just cleanly close the range filter
+        return String.format("%s ]}", filter);
+      }
+    }
   }
 
   private static class BoundedMongoDbReader extends BoundedSource.BoundedReader<Document>
{

http://git-wip-us.apache.org/repos/asf/beam/blob/e10662df/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 454c6ba..cd26b48 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -38,6 +38,8 @@ import java.io.File;
 import java.io.Serializable;
 import java.net.ServerSocket;
 import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -139,6 +141,22 @@ public class MongoDbIOTest implements Serializable {
   }
 
   @Test
+  public void testSplitIntoFilters() throws Exception {
+    ArrayList<Document> documents = new ArrayList<>();
+    documents.add(new Document("_id", 56));
+    documents.add(new Document("_id", 109));
+    documents.add(new Document("_id", 256));
+    List<String> filters = MongoDbIO.BoundedMongoDbSource.splitKeysToFilters(documents,
null);
+    assertEquals(4, filters.size());
+    assertEquals("{ $and: [ {\"_id\":{$lte:ObjectId(\"56\")}} ]}", filters.get(0));
+    assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"56\"),$lte:ObjectId(\"109\")}} ]}",
+        filters.get(1));
+    assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"109\"),$lte:ObjectId(\"256\")}} ]}",
+        filters.get(2));
+    assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"256\")}} ]}", filters.get(3));
+  }
+
+  @Test
   public void testFullRead() throws Exception {
 
     PCollection<Document> output = pipeline.apply(


Mime
View raw message