beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: Add localhost option for DatastoreIO
Date Fri, 03 Feb 2017 10:47:47 GMT
Repository: beam
Updated Branches:
  refs/heads/master 8b0449450 -> 8ee3572b4


Add localhost option for DatastoreIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/da8d0dd8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/da8d0dd8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/da8d0dd8

Branch: refs/heads/master
Commit: da8d0dd800bc0508b1f5b728211147de8ad9d086
Parents: 8b04494
Author: Vikas Kedigehalli <vikasrk@google.com>
Authored: Fri Oct 28 15:31:38 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Fri Feb 3 02:05:22 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 147 ++++++++++++-------
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   |  15 +-
 .../sdk/io/gcp/datastore/SplitQueryFnIT.java    |   2 +-
 3 files changed, 107 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/da8d0dd8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 1e8271c..4a219aa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -93,7 +93,8 @@ import org.slf4j.LoggerFactory;
 /**
  * {@link DatastoreV1} provides an API to Read, Write and Delete {@link PCollection PCollections}
  * of <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a>
version v1
- * {@link Entity} objects.
+ * {@link Entity} objects. Read is only supported for Bounded PCollections while Write and
Delete
+ * are supported for both Bounded and Unbounded PCollections.
  *
  * <p>This API currently requires an authentication workaround. To use {@link DatastoreV1},
users
  * must use the {@code gcloud} command line tool to get credentials for Cloud Datastore:
@@ -124,8 +125,10 @@ import org.slf4j.LoggerFactory;
  *
  * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud
Datastore in parallel across
  * many workers. However, when the {@link Query} is configured with a limit using
- * {@link com.google.datastore.v1.Query.Builder#setLimit(Int32Value)}, then
- * all returned results will be read by a single Dataflow worker in order to ensure correct
data.
+ * {@link com.google.datastore.v1.Query.Builder#setLimit(Int32Value)} or if the Query contains
+ * inequality filters like {@code GREATER_THAN, LESS_THAN} etc., then all returned results
+ * will be read by a single worker in order to ensure correct data. Since data is read from
+ * a single worker, this could have a significant impact on the performance of the job.
  *
  * <p>To write a {@link PCollection} to a Cloud Datastore, use {@link DatastoreV1#write},
  * specifying the Cloud Datastore project to write to:
@@ -176,6 +179,10 @@ import org.slf4j.LoggerFactory;
  * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud
Datastore Sign Up
  * </a>for security and permission related information specific to Cloud Datastore.
  *
+ * <p>Optionally, Cloud Datastore V1 Emulator, running locally, could be used for testing
purposes
+ * by providing the host port information through {@code withLocalhost("host:port"} for all
the
+ * above transforms. In such a case, all the Cloud Datastore API calls are directed to the
Emulator.
+ *
  * @see org.apache.beam.sdk.runners.PipelineRunner
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
@@ -231,6 +238,7 @@ public class DatastoreV1 {
     @Nullable public abstract Query getQuery();
     @Nullable public abstract String getNamespace();
     public abstract int getNumQuerySplits();
+    @Nullable public abstract String getLocalhost();
 
     @Override
     public abstract String toString();
@@ -243,6 +251,7 @@ public class DatastoreV1 {
       abstract Builder setQuery(Query query);
       abstract Builder setNamespace(String namespace);
       abstract Builder setNumQuerySplits(int numQuerySplits);
+      abstract Builder setLocalhost(String localhost);
       abstract Read build();
     }
 
@@ -410,10 +419,20 @@ public class DatastoreV1 {
           .build();
     }
 
+    /**
+     * Returns a new {@link DatastoreV1.Read} that reads from a Datastore Emulator running
at the
+     * given localhost address.
+     */
+    public DatastoreV1.Read withLocalhost(String localhost) {
+      return toBuilder()
+          .setLocalhost(localhost)
+          .build();
+    }
+
     @Override
     public PCollection<Entity> expand(PBegin input) {
       V1Options v1Options = V1Options.from(getProjectId(), getQuery(),
-          getNamespace());
+          getNamespace(), getLocalhost());
 
       /*
        * This composite transform involves the following steps:
@@ -469,34 +488,17 @@ public class DatastoreV1 {
      * A class for v1 Cloud Datastore related options.
      */
     @VisibleForTesting
-    static class V1Options implements Serializable {
-      private final Query query;
-      private final String projectId;
-      @Nullable
-      private final String namespace;
-
-      private V1Options(String projectId, Query query, @Nullable String namespace) {
-        this.projectId = checkNotNull(projectId, "projectId");
-        this.query = checkNotNull(query, "query");
-        this.namespace = namespace;
+    @AutoValue
+    abstract static class V1Options implements Serializable {
+      public static V1Options from(String projectId, Query query, @Nullable String namespace,
+                                   @Nullable String localhost) {
+        return new AutoValue_DatastoreV1_Read_V1Options(projectId, query, namespace, localhost);
       }
 
-      public static V1Options from(String projectId, Query query, @Nullable String namespace)
{
-        return new V1Options(projectId, query, namespace);
-      }
-
-      public Query getQuery() {
-        return query;
-      }
-
-      public String getProjectId() {
-        return projectId;
-      }
-
-      @Nullable
-      public String getNamespace() {
-        return namespace;
-      }
+      public abstract String getProjectId();
+      public abstract Query getQuery();
+      @Nullable public abstract String getNamespace();
+      @Nullable public abstract String getLocalhost();
     }
 
     /**
@@ -529,7 +531,8 @@ public class DatastoreV1 {
 
       @StartBundle
       public void startBundle(Context c) throws Exception {
-        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId);
+        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId(),
+            options.getLocalhost());
         querySplitter = datastoreFactory.getQuerySplitter();
       }
 
@@ -603,7 +606,8 @@ public class DatastoreV1 {
 
       @StartBundle
       public void startBundle(Context c) throws Exception {
-        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId());
+        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId(),
+            options.getLocalhost());
       }
 
       /** Read and output entities for the given query. */
@@ -664,7 +668,7 @@ public class DatastoreV1 {
    * {@code projectId} using {@link DatastoreV1.Write#withProjectId}.
    */
   public Write write() {
-    return new Write(null);
+    return new Write(null, null);
   }
 
   /**
@@ -672,7 +676,7 @@ public class DatastoreV1 {
    * {@code projectId} using {@link DeleteEntity#withProjectId}.
    */
   public DeleteEntity deleteEntity() {
-    return new DeleteEntity(null);
+    return new DeleteEntity(null, null);
   }
 
   /**
@@ -680,7 +684,7 @@ public class DatastoreV1 {
    * {@code projectId} using {@link DeleteKey#withProjectId}.
    */
   public DeleteKey deleteKey() {
-    return new DeleteKey(null);
+    return new DeleteKey(null, null);
   }
 
   /**
@@ -693,8 +697,8 @@ public class DatastoreV1 {
      * Note that {@code projectId} is only {@code @Nullable} as a matter of build order,
but if
      * it is {@code null} at instantiation time, an error will be thrown.
      */
-    Write(@Nullable String projectId) {
-      super(projectId, new UpsertFn());
+    Write(@Nullable String projectId, @Nullable String localhost) {
+      super(projectId, localhost, new UpsertFn());
     }
 
     /**
@@ -702,7 +706,16 @@ public class DatastoreV1 {
      */
     public Write withProjectId(String projectId) {
       checkNotNull(projectId, "projectId");
-      return new Write(projectId);
+      return new Write(projectId, null);
+    }
+
+    /**
+     * Returns a new {@link Write} that writes to the Cloud Datastore Emulator running locally
on
+     * the specified host port.
+     */
+    public Write withLocalhost(String localhost) {
+      checkNotNull(localhost, "localhost");
+      return new Write(null, localhost);
     }
   }
 
@@ -716,8 +729,8 @@ public class DatastoreV1 {
      * Note that {@code projectId} is only {@code @Nullable} as a matter of build order,
but if
      * it is {@code null} at instantiation time, an error will be thrown.
      */
-    DeleteEntity(@Nullable String projectId) {
-      super(projectId, new DeleteEntityFn());
+    DeleteEntity(@Nullable String projectId, @Nullable String localhost) {
+      super(projectId, localhost, new DeleteEntityFn());
     }
 
     /**
@@ -726,7 +739,16 @@ public class DatastoreV1 {
      */
     public DeleteEntity withProjectId(String projectId) {
       checkNotNull(projectId, "projectId");
-      return new DeleteEntity(projectId);
+      return new DeleteEntity(projectId, null);
+    }
+
+    /**
+     * Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore
Emulator
+     * running locally on the specified host port.
+     */
+    public DeleteEntity withLocalhost(String localhost) {
+      checkNotNull(localhost, "localhost");
+      return new DeleteEntity(null, localhost);
     }
   }
 
@@ -741,8 +763,8 @@ public class DatastoreV1 {
      * Note that {@code projectId} is only {@code @Nullable} as a matter of build order,
but if
      * it is {@code null} at instantiation time, an error will be thrown.
      */
-    DeleteKey(@Nullable String projectId) {
-      super(projectId, new DeleteKeyFn());
+    DeleteKey(@Nullable String projectId, @Nullable String localhost) {
+      super(projectId, localhost, new DeleteKeyFn());
     }
 
     /**
@@ -751,7 +773,16 @@ public class DatastoreV1 {
      */
     public DeleteKey withProjectId(String projectId) {
       checkNotNull(projectId, "projectId");
-      return new DeleteKey(projectId);
+      return new DeleteKey(projectId, null);
+    }
+
+    /**
+     * Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore Emulator
+     * running locally on the specified host port.
+     */
+    public DeleteKey withLocalhost(String localhost) {
+      checkNotNull(localhost, "localhost");
+      return new DeleteKey(null, localhost);
     }
   }
 
@@ -766,6 +797,8 @@ public class DatastoreV1 {
   private abstract static class Mutate<T> extends PTransform<PCollection<T>,
PDone> {
     @Nullable
     private final String projectId;
+    @Nullable
+    private final String localhost;
     /** A function that transforms each {@code T} into a mutation. */
     private final SimpleFunction<T, Mutation> mutationFn;
 
@@ -773,15 +806,18 @@ public class DatastoreV1 {
      * Note that {@code projectId} is only {@code @Nullable} as a matter of build order,
but if
      * it is {@code null} at instantiation time, an error will be thrown.
      */
-    Mutate(@Nullable String projectId, SimpleFunction<T, Mutation> mutationFn) {
+    Mutate(@Nullable String projectId, @Nullable String localhost,
+        SimpleFunction<T, Mutation> mutationFn) {
       this.projectId = projectId;
+      this.localhost = localhost;
       this.mutationFn = checkNotNull(mutationFn);
     }
 
     @Override
     public PDone expand(PCollection<T> input) {
       input.apply("Convert to Mutation", MapElements.via(mutationFn))
-          .apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId)));
+          .apply("Write Mutation to Datastore", ParDo.of(
+              new DatastoreWriterFn(projectId, localhost)));
 
       return PDone.in(input.getPipeline());
     }
@@ -832,6 +868,8 @@ public class DatastoreV1 {
   static class DatastoreWriterFn extends DoFn<Mutation, Void> {
     private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
     private final String projectId;
+    @Nullable
+    private final String localhost;
     private transient Datastore datastore;
     private final V1DatastoreFactory datastoreFactory;
     // Current batch of mutations to be written.
@@ -842,19 +880,21 @@ public class DatastoreV1 {
         FluentBackoff.DEFAULT
             .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
 
-    DatastoreWriterFn(String projectId) {
-      this(projectId, new V1DatastoreFactory());
+    DatastoreWriterFn(String projectId, @Nullable String localhost) {
+      this(projectId, localhost, new V1DatastoreFactory());
     }
 
     @VisibleForTesting
-    DatastoreWriterFn(String projectId, V1DatastoreFactory datastoreFactory) {
+    DatastoreWriterFn(String projectId, @Nullable String localhost,
+                      V1DatastoreFactory datastoreFactory) {
       this.projectId = checkNotNull(projectId, "projectId");
+      this.localhost = localhost;
       this.datastoreFactory = datastoreFactory;
     }
 
     @StartBundle
     public void startBundle(Context c) {
-      datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId);
+      datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId, localhost);
     }
 
     @ProcessElement
@@ -1008,7 +1048,8 @@ public class DatastoreV1 {
   static class V1DatastoreFactory implements Serializable {
 
     /** Builds a Cloud Datastore client for the given pipeline options and project. */
-    public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
+    public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId,
+        @Nullable String localhost) {
       Credentials credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
       HttpRequestInitializer initializer;
       if (credential != null) {
@@ -1024,6 +1065,10 @@ public class DatastoreV1 {
               .projectId(projectId)
               .initializer(initializer);
 
+      if (localhost != null) {
+        builder.localHost(localhost);
+      }
+
       return DatastoreFactory.get().create(builder.build());
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/da8d0dd8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index dd1904a..c2bc8d2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -110,12 +110,13 @@ public class DatastoreV1Test {
   private static final String NAMESPACE = "testNamespace";
   private static final String KIND = "testKind";
   private static final Query QUERY;
+  private static final String LOCALHOST = "localhost:9955";
   private static final V1Options V_1_OPTIONS;
   static {
     Query.Builder q = Query.newBuilder();
     q.addKindBuilder().setName(KIND);
     QUERY = q.build();
-    V_1_OPTIONS = V1Options.from(PROJECT_ID, QUERY, NAMESPACE);
+    V_1_OPTIONS = V1Options.from(PROJECT_ID, QUERY, NAMESPACE, null);
   }
   private DatastoreV1.Read initialRead;
 
@@ -136,7 +137,8 @@ public class DatastoreV1Test {
     initialRead = DatastoreIO.v1().read()
         .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
 
-    when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class)))
+    when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class),
+        any(String.class)))
         .thenReturn(mockDatastore);
     when(mockDatastoreFactory.getQuerySplitter())
         .thenReturn(mockQuerySplitter);
@@ -157,10 +159,12 @@ public class DatastoreV1Test {
   @Test
   public void testBuildReadAlt() throws Exception {
     DatastoreV1.Read read =  DatastoreIO.v1().read()
-        .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY);
+        .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY)
+        .withLocalhost(LOCALHOST);
     assertEquals(QUERY, read.getQuery());
     assertEquals(PROJECT_ID, read.getProjectId());
     assertEquals(NAMESPACE, read.getNamespace());
+    assertEquals(LOCALHOST, read.getLocalhost());
   }
 
   @Test
@@ -504,7 +508,7 @@ public class DatastoreV1Test {
 
   @Test
   public void testDatastoreWriteFnDisplayData() {
-    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID);
+    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null);
     DisplayData displayData = DisplayData.from(datastoreWriter);
     assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
   }
@@ -539,7 +543,8 @@ public class DatastoreV1Test {
           makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
     }
 
-    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
+    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null,
+        mockDatastoreFactory);
     DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
     doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
     doFnTester.processBundle(mutations);

http://git-wip-us.apache.org/repos/asf/beam/blob/da8d0dd8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
index 4dd1608..49a60c6 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
@@ -86,7 +86,7 @@ public class SplitQueryFnIT {
     query.addKindBuilder().setName(kind);
 
     SplitQueryFn splitQueryFn = new SplitQueryFn(
-        V1Options.from(projectId, query.build(), namespace), 0);
+        V1Options.from(projectId, query.build(), namespace, null), 0);
     DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
 
     List<KV<Integer, Query>> queries = doFnTester.processBundle(query.build());


Mime
View raw message