beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: [BEAM-975] Improve default connection options, javadoc and style in MongoDbIO
Date Mon, 19 Jun 2017 19:24:16 GMT
Repository: beam
Updated Branches:
  refs/heads/master 1476f3412 -> b400f4a6f


[BEAM-975] Improve default connection options, javadoc and style in MongoDbIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87be64e9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87be64e9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87be64e9

Branch: refs/heads/master
Commit: 87be64e9817da5e5c86a243471021268d6281b33
Parents: 1476f34
Author: Jean-Baptiste Onofré <jbonofre@apache.org>
Authored: Fri May 12 15:21:49 2017 +0200
Committer: Jean-Baptiste Onofré <jbonofre@apache.org>
Committed: Mon Jun 19 21:23:11 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   | 315 +++++++++++++++----
 .../beam/sdk/io/mongodb/MongoDbIOTest.java      |  37 +++
 2 files changed, 283 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/87be64e9/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 620df74..04d9975 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -18,12 +18,13 @@
 package org.apache.beam.sdk.io.mongodb;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.mongodb.BasicDBObject;
 import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoClientURI;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
@@ -100,12 +101,20 @@ public class MongoDbIO {
 
   /** Read data from MongoDB. */
   public static Read read() {
-    return new AutoValue_MongoDbIO_Read.Builder().setNumSplits(0).build();
+    return new AutoValue_MongoDbIO_Read.Builder()
+        .setKeepAlive(true)
+        .setMaxConnectionIdleTime(60000)
+        .setNumSplits(0)
+        .build();
   }
 
   /** Write data to MongoDB. */
   public static Write write() {
-    return new AutoValue_MongoDbIO_Write.Builder().setBatchSize(1024L).build();
+    return new AutoValue_MongoDbIO_Write.Builder()
+        .setKeepAlive(true)
+        .setMaxConnectionIdleTime(60000)
+        .setBatchSize(1024L)
+        .build();
   }
 
   private MongoDbIO() {
@@ -117,16 +126,20 @@ public class MongoDbIO {
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, PCollection<Document>>
{
     @Nullable abstract String uri();
+    abstract boolean keepAlive();
+    abstract int maxConnectionIdleTime();
     @Nullable abstract String database();
     @Nullable abstract String collection();
     @Nullable abstract String filter();
     abstract int numSplits();
 
-    abstract Builder toBuilder();
+    abstract Builder builder();
 
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setUri(String uri);
+      abstract Builder setKeepAlive(boolean keepAlive);
+      abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime);
       abstract Builder setDatabase(String database);
       abstract Builder setCollection(String collection);
       abstract Builder setFilter(String filter);
@@ -135,31 +148,94 @@ public class MongoDbIO {
     }
 
     /**
-     * Example documentation for withUri.
+     * Define the location of the MongoDB instances using an URI. The URI describes the hosts
to
+     * be used and some options.
+     *
+     * <p>The format of the URI is:
+     *
+     * <pre>{@code
+     * mongodb://[username:password@]host1[:port1]...[,hostN[:portN]]][/[database][?options]]
+     * }</pre>
+     *
+     * <p>Where:
+     *   <ul>
+     *     <li>{@code mongodb://} is a required prefix to identify that this is a string
in the
+     *     standard connection format.</li>
+     *     <li>{@code username:password@} are optional. If given, the driver will attempt
to
+     *     login to a database after connecting to a database server. For some authentication
+     *     mechanisms, only the username is specified and the password is not, in which case
+     *     the ":" after the username is left off as well.</li>
+     *     <li>{@code host1} is the only required part of the URI. It identifies a
server
+     *     address to connect to.</li>
+     *     <li>{@code :portX} is optional and defaults to {@code :27017} if not provided.</li>
+     *     <li>{@code /database} is the name of the database to login to and thus is
only
+     *     relevant if the {@code username:password@} syntax is used. If not specified, the
+     *     "admin" database will be used by default. It has to be equivalent with the database
+     *     you specific with {@link Read#withDatabase(String)}.</li>
+     *     <li>{@code ?options} are connection options. Note that if {@code database}
is absent
+     *     there is still a {@code /} required between the last {@code host} and the {@code
?}
+     *     introducing the options. Options are name=value pairs and the pairs are separated
by
+     *     "{@code &}". The {@code KeepAlive} connection option can't be passed via the
URI,
+     *     instead you have to use {@link Read#withKeepAlive(boolean)}. Same for the
+     *     {@code MaxConnectionIdleTime} connection option via
+     *     {@link Read#withMaxConnectionIdleTime(int)}.
+     *     </li>
+     *   </ul>
      */
     public Read withUri(String uri) {
-      checkNotNull(uri);
-      return toBuilder().setUri(uri).build();
+      checkArgument(uri != null, "MongoDbIO.read().withUri(uri) called with null uri");
+      return builder().setUri(uri).build();
+    }
+
+    /**
+     * Sets whether socket keep alive is enabled.
+     */
+    public Read withKeepAlive(boolean keepAlive) {
+      return builder().setKeepAlive(keepAlive).build();
+    }
+
+    /**
+     * Sets the maximum idle time for a pooled connection.
+     */
+    public Read withMaxConnectionIdleTime(int maxConnectionIdleTime) {
+      return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build();
     }
 
+    /**
+     * Sets the database to use.
+     */
     public Read withDatabase(String database) {
-      checkNotNull(database);
-      return toBuilder().setDatabase(database).build();
+      checkArgument(database != null, "MongoDbIO.read().withDatabase(database) called with
null"
+          + " database");
+      return builder().setDatabase(database).build();
     }
 
+    /**
+     * Sets the collection to consider in the database.
+     */
     public Read withCollection(String collection) {
-      checkNotNull(collection);
-      return toBuilder().setCollection(collection).build();
+      checkArgument(collection != null, "MongoDbIO.read().withCollection(collection) called
"
+          + "with null collection");
+      return builder().setCollection(collection).build();
     }
 
+    /**
+     * Sets a filter on the documents in a collection.
+     */
     public Read withFilter(String filter) {
-      checkNotNull(filter);
-      return toBuilder().setFilter(filter).build();
+      checkArgument(filter != null, "MongoDbIO.read().withFilter(filter) called with null
"
+          + "filter");
+      return builder().setFilter(filter).build();
     }
 
+    /**
+     * Sets the user defined number of splits.
+     */
     public Read withNumSplits(int numSplits) {
-      checkArgument(numSplits >= 0);
-      return toBuilder().setNumSplits(numSplits).build();
+      checkArgument(numSplits >= 0, "MongoDbIO.read().withNumSplits(numSplits) called
with "
+          + "invalid number. The number of splits has to be a positive value (currently %d)",
+          numSplits);
+      return builder().setNumSplits(numSplits).build();
     }
 
     @Override
@@ -169,15 +245,19 @@ public class MongoDbIO {
 
     @Override
     public void validate(PipelineOptions options) {
-      checkNotNull(uri(), "uri");
-      checkNotNull(database(), "database");
-      checkNotNull(collection(), "collection");
+      checkState(uri() != null, "MongoDbIO.read() requires an URI to be set via withUri(uri)");
+      checkState(database() != null, "MongoDbIO.read() requires a database to be set via
"
+          + "withDatabase(database)");
+      checkState(collection() != null, "MongoDbIO.read() requires a collection to be set
via "
+          + "withCollection(collection)");
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder.add(DisplayData.item("uri", uri()));
+      builder.add(DisplayData.item("keepAlive", keepAlive()));
+      builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime()));
       builder.add(DisplayData.item("database", database()));
       builder.add(DisplayData.item("collection", collection()));
       builder.addIfNotNull(DisplayData.item("filter", filter()));
@@ -218,61 +298,71 @@ public class MongoDbIO {
 
     @Override
     public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
-      MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()));
-      MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
+      try (MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()))) {
+        return getEstimatedSizeBytes(mongoClient, spec.database(), spec.collection());
+      }
+    }
+
+    private long getEstimatedSizeBytes(MongoClient mongoClient,
+                                       String database,
+                                       String collection) {
+      MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
 
       // get the Mongo collStats object
       // it gives the size for the entire collection
       BasicDBObject stat = new BasicDBObject();
-      stat.append("collStats", spec.collection());
+      stat.append("collStats", collection);
       Document stats = mongoDatabase.runCommand(stat);
+
       return stats.get("size", Number.class).longValue();
     }
 
     @Override
     public List<BoundedSource<Document>> split(long desiredBundleSizeBytes,
                                                 PipelineOptions options) {
-      MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()));
-      MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
-
-      List<Document> splitKeys;
-      if (spec.numSplits() > 0) {
-        // the user defines his desired number of splits
-        // calculate the batch size
-        long estimatedSizeBytes = getEstimatedSizeBytes(options);
-        desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits();
-      }
+      try (MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()))) {
+        MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
+
+        List<Document> splitKeys;
+        if (spec.numSplits() > 0) {
+          // the user defines his desired number of splits
+          // calculate the batch size
+          long estimatedSizeBytes = getEstimatedSizeBytes(mongoClient,
+              spec.database(), spec.collection());
+          desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits();
+        }
 
-      // the desired batch size is small, using default chunk size of 1MB
-      if (desiredBundleSizeBytes < 1024 * 1024) {
-        desiredBundleSizeBytes = 1 * 1024 * 1024;
-      }
+        // the desired batch size is small, using default chunk size of 1MB
+        if (desiredBundleSizeBytes < 1024 * 1024) {
+          desiredBundleSizeBytes = 1 * 1024 * 1024;
+        }
 
-      // now we have the batch size (provided by user or provided by the runner)
-      // we use Mongo splitVector command to get the split keys
-      BasicDBObject splitVectorCommand = new BasicDBObject();
-      splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection());
-      splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1));
-      splitVectorCommand.append("force", false);
-      // maxChunkSize is the Mongo partition size in MB
-      LOG.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024);
-      splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024);
-      Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand);
-      splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys");
-
-      List<BoundedSource<Document>> sources = new ArrayList<>();
-      if (splitKeys.size() < 1) {
-        LOG.debug("Split keys is low, using an unique source");
-        sources.add(this);
-        return sources;
-      }
+        // now we have the batch size (provided by user or provided by the runner)
+        // we use Mongo splitVector command to get the split keys
+        BasicDBObject splitVectorCommand = new BasicDBObject();
+        splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection());
+        splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1));
+        splitVectorCommand.append("force", false);
+        // maxChunkSize is the Mongo partition size in MB
+        LOG.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024);
+        splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024);
+        Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand);
+        splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys");
+
+        List<BoundedSource<Document>> sources = new ArrayList<>();
+        if (splitKeys.size() < 1) {
+          LOG.debug("Split keys is low, using an unique source");
+          sources.add(this);
+          return sources;
+        }
 
-      LOG.debug("Number of splits is {}", splitKeys.size());
-      for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) {
-        sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter)));
-      }
+        LOG.debug("Number of splits is {}", splitKeys.size());
+        for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) {
+          sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter)));
+        }
 
-      return sources;
+        return sources;
+      }
     }
 
     /**
@@ -367,7 +457,10 @@ public class MongoDbIO {
     @Override
     public boolean start() {
       Read spec = source.spec;
-      client = new MongoClient(new MongoClientURI(spec.uri()));
+      MongoClientOptions.Builder optionsBuilder = new MongoClientOptions.Builder();
+      optionsBuilder.maxConnectionIdleTime(spec.maxConnectionIdleTime());
+      optionsBuilder.socketKeepAlive(spec.keepAlive());
+      client = new MongoClient(new MongoClientURI(spec.uri(), optionsBuilder));
 
       MongoDatabase mongoDatabase = client.getDatabase(spec.database());
 
@@ -426,36 +519,106 @@ public class MongoDbIO {
    */
   @AutoValue
   public abstract static class Write extends PTransform<PCollection<Document>, PDone>
{
+
     @Nullable abstract String uri();
+    abstract boolean keepAlive();
+    abstract int maxConnectionIdleTime();
     @Nullable abstract String database();
     @Nullable abstract String collection();
     abstract long batchSize();
 
-    abstract Builder toBuilder();
+    abstract Builder builder();
 
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setUri(String uri);
+      abstract Builder setKeepAlive(boolean keepAlive);
+      abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime);
       abstract Builder setDatabase(String database);
       abstract Builder setCollection(String collection);
       abstract Builder setBatchSize(long batchSize);
       abstract Write build();
     }
 
+    /**
+     * Define the location of the MongoDB instances using an URI. The URI describes the hosts
to
+     * be used and some options.
+     *
+     * <p>The format of the URI is:
+     *
+     * <pre>{@code
+     * mongodb://[username:password@]host1[:port1],...[,hostN[:portN]]][/[database][?options]]
+     * }</pre>
+     *
+     * <p>Where:
+     *   <ul>
+     *     <li>{@code mongodb://} is a required prefix to identify that this is a string
in the
+     *     standard connection format.</li>
+     *     <li>{@code username:password@} are optional. If given, the driver will attempt
to
+     *     login to a database after connecting to a database server. For some authentication
+     *     mechanisms, only the username is specified and the password is not, in which case
+     *     the ":" after the username is left off as well.</li>
+     *     <li>{@code host1} is the only required part of the URI. It identifies a
server
+     *     address to connect to.</li>
+     *     <li>{@code :portX} is optional and defaults to {@code :27017} if not provided.</li>
+     *     <li>{@code /database} is the name of the database to login to and thus is
only
+     *     relevant if the {@code username:password@} syntax is used. If not specified, the
+     *     "admin" database will be used by default. It has to be equivalent with the database
+     *     you specific with {@link Write#withDatabase(String)}.</li>
+     *     <li>{@code ?options} are connection options. Note that if {@code database}
is absent
+     *     there is still a {@code /} required between the last {@code host} and the {@code
?}
+     *     introducing the options. Options are name=value pairs and the pairs are separated
by
+     *     "{@code &}". The {@code KeepAlive} connection option can't be passed via the
URI, instead
+     *     you have to use {@link Write#withKeepAlive(boolean)}. Same for the
+     *     {@code MaxConnectionIdleTime} connection option via
+     *     {@link Write#withMaxConnectionIdleTime(int)}.
+     *     </li>
+     *   </ul>
+     */
     public Write withUri(String uri) {
-      return toBuilder().setUri(uri).build();
+      checkArgument(uri != null, "MongoDbIO.write().withUri(uri) called with null uri");
+      return builder().setUri(uri).build();
+    }
+
+    /**
+     * Sets whether socket keep alive is enabled.
+     */
+    public Write withKeepAlive(boolean keepAlive) {
+      return builder().setKeepAlive(keepAlive).build();
+    }
+
+    /**
+     * Sets the maximum idle time for a pooled connection.
+     */
+    public Write withMaxConnectionIdleTime(int maxConnectionIdleTime) {
+      return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build();
     }
 
+    /**
+     * Sets the database to use.
+     */
     public Write withDatabase(String database) {
-      return toBuilder().setDatabase(database).build();
+      checkArgument(database != null, "MongoDbIO.write().withDatabase(database) called with
"
+          + "null database");
+      return builder().setDatabase(database).build();
     }
 
+    /**
+     * Sets the collection where to write data in the database.
+     */
     public Write withCollection(String collection) {
-      return toBuilder().setCollection(collection).build();
+      checkArgument(collection != null, "MongoDbIO.write().withCollection(collection) called
"
+          + "with null collection");
+      return builder().setCollection(collection).build();
     }
 
+    /**
+     * Define the size of the batch to group write operations.
+     */
     public Write withBatchSize(long batchSize) {
-      return toBuilder().setBatchSize(batchSize).build();
+      checkArgument(batchSize >= 0, "MongoDbIO.write().withBatchSize(batchSize) called
with "
+          + "invalid batch size. Batch size has to be >= 0 (currently %d)", batchSize);
+      return builder().setBatchSize(batchSize).build();
     }
 
     @Override
@@ -466,10 +629,21 @@ public class MongoDbIO {
 
     @Override
     public void validate(PipelineOptions options) {
-      checkNotNull(uri(), "uri");
-      checkNotNull(database(), "database");
-      checkNotNull(collection(), "collection");
-      checkNotNull(batchSize(), "batchSize");
+      checkState(uri() != null, "MongoDbIO.write() requires an URI to be set via withUri(uri)");
+      checkState(database() != null, "MongoDbIO.write() requires a database to be set via
"
+          + "withDatabase(database)");
+      checkState(collection() != null, "MongoDbIO.write() requires a collection to be set
via "
+          + "withCollection(collection)");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add(DisplayData.item("uri", uri()));
+      builder.add(DisplayData.item("keepAlive", keepAlive()));
+      builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime()));
+      builder.add(DisplayData.item("database", database()));
+      builder.add(DisplayData.item("collection", collection()));
+      builder.add(DisplayData.item("batchSize", batchSize()));
     }
 
     private static class WriteFn extends DoFn<Document, Void> {
@@ -483,7 +657,10 @@ public class MongoDbIO {
 
       @Setup
       public void createMongoClient() throws Exception {
-        client = new MongoClient(new MongoClientURI(spec.uri()));
+        MongoClientOptions.Builder builder = new MongoClientOptions.Builder();
+        builder.socketKeepAlive(spec.keepAlive());
+        builder.maxConnectionIdleTime(spec.maxConnectionIdleTime());
+        client = new MongoClient(new MongoClientURI(spec.uri(), builder));
       }
 
       @StartBundle

http://git-wip-us.apache.org/repos/asf/beam/blob/87be64e9/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index cd26b48..67dbca4 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.mongodb;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import com.mongodb.MongoClient;
 import com.mongodb.client.MongoCollection;
@@ -189,6 +190,42 @@ public class MongoDbIOTest implements Serializable {
   }
 
   @Test
+  public void testReadWithCustomConnectionOptions() throws Exception {
+    MongoDbIO.Read read = MongoDbIO.read()
+        .withUri("mongodb://localhost:" + port)
+        .withKeepAlive(false)
+        .withMaxConnectionIdleTime(10)
+        .withDatabase(DATABASE)
+        .withCollection(COLLECTION);
+    assertFalse(read.keepAlive());
+    assertEquals(10, read.maxConnectionIdleTime());
+
+    PCollection<Document> documents = pipeline.apply(read);
+
+    PAssert.thatSingleton(documents.apply("Count All", Count.<Document>globally()))
+        .isEqualTo(1000L);
+
+    PAssert.that(documents
+        .apply("Map Scientist", MapElements.via(new SimpleFunction<Document, KV<String,
Void>>() {
+          public KV<String, Void> apply(Document input) {
+            return KV.of(input.getString("scientist"), null);
+          }
+        }))
+        .apply("Count Scientist", Count.<String, Void>perKey())
+    ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>()
{
+      @Override
+      public Void apply(Iterable<KV<String, Long>> input) {
+        for (KV<String, Long> element : input) {
+          assertEquals(100L, element.getValue().longValue());
+        }
+        return null;
+      }
+    });
+
+    pipeline.run();
+  }
+
+  @Test
   public void testReadWithFilter() throws Exception {
 
     PCollection<Document> output = pipeline.apply(


Mime
View raw message