beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [04/12] incubator-beam git commit: [BEAM-151] Break out Dataflow transport creation to another file
Date Thu, 07 Apr 2016 18:21:14 GMT
[BEAM-151] Break out Dataflow transport creation to another file

This prevents moving DataflowPipelineOptions into a Dataflow runner maven module.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d0db477a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d0db477a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d0db477a

Branch: refs/heads/master
Commit: d0db477a0ce436728f71f0f4aec0b0098eac66be
Parents: c8cb55a
Author: Luke Cwik <lcwik@google.com>
Authored: Mon Mar 28 13:09:33 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Thu Apr 7 11:19:49 2016 -0700

----------------------------------------------------------------------
 .../options/DataflowPipelineDebugOptions.java   |   5 +-
 .../sdk/runners/DataflowPipelineRunner.java     |   6 +-
 .../dataflow/sdk/util/DataflowTransport.java    | 111 +++++++++++++++++++
 .../cloud/dataflow/sdk/util/Transport.java      |  46 --------
 .../sdk/options/GoogleApiDebugOptionsTest.java  |  93 ++++++++--------
 5 files changed, 163 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
index b55fa17..6231bd4 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
@@ -20,11 +20,11 @@ package com.google.cloud.dataflow.sdk.options;
 import com.google.api.services.dataflow.Dataflow;
 import com.google.cloud.dataflow.sdk.annotations.Experimental;
 import com.google.cloud.dataflow.sdk.util.DataflowPathValidator;
+import com.google.cloud.dataflow.sdk.util.DataflowTransport;
 import com.google.cloud.dataflow.sdk.util.GcsStager;
 import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
 import com.google.cloud.dataflow.sdk.util.PathValidator;
 import com.google.cloud.dataflow.sdk.util.Stager;
-import com.google.cloud.dataflow.sdk.util.Transport;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 
@@ -159,7 +159,8 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions
{
   public static class DataflowClientFactory implements DefaultValueFactory<Dataflow>
{
     @Override
     public Dataflow create(PipelineOptions options) {
-        return Transport.newDataflowClient(options.as(DataflowPipelineOptions.class)).build();
+        return DataflowTransport.newDataflowClient(
+            options.as(DataflowPipelineOptions.class)).build();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index 5f43cc3..50ca36f 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -99,6 +99,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
 import com.google.cloud.dataflow.sdk.util.CoderUtils;
 import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo;
+import com.google.cloud.dataflow.sdk.util.DataflowTransport;
 import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
 import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
 import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
@@ -107,7 +108,6 @@ import com.google.cloud.dataflow.sdk.util.PathValidator;
 import com.google.cloud.dataflow.sdk.util.PropertyNames;
 import com.google.cloud.dataflow.sdk.util.Reshuffle;
 import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
-import com.google.cloud.dataflow.sdk.util.Transport;
 import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -444,7 +444,7 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
       throw new RuntimeException("Should not specify the debuggee");
     }
 
-    Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build();
+    Clouddebugger debuggerClient = DataflowTransport.newClouddebuggerClient(options).build();
     Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier);
     options.setDebuggee(debuggee);
 
@@ -600,7 +600,7 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
     // regularly and need not be retried automatically.
     DataflowPipelineJob dataflowPipelineJob =
         new DataflowPipelineJob(options.getProject(), jobResult.getId(),
-            Transport.newRawDataflowClient(options).build(), aggregatorTransforms);
+            DataflowTransport.newRawDataflowClient(options).build(), aggregatorTransforms);
 
     // If the service returned client request id, the SDK needs to compare it
     // with the original id generated in the request, if they are not the same

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
new file mode 100644
index 0000000..8de358c
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
@@ -0,0 +1,111 @@
+/*******************************************************************************
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ ******************************************************************************/
+
+package com.google.cloud.dataflow.sdk.util;
+
+import static com.google.cloud.dataflow.sdk.util.Transport.getJsonFactory;
+import static com.google.cloud.dataflow.sdk.util.Transport.getTransport;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.services.clouddebugger.v2.Clouddebugger;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.collect.ImmutableList;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+/**
+ * Helpers for cloud communication.
+ */
+public class DataflowTransport {
+
+
+  private static class ApiComponents {
+    public String rootUrl;
+    public String servicePath;
+
+    public ApiComponents(String root, String path) {
+      this.rootUrl = root;
+      this.servicePath = path;
+    }
+  }
+
+  private static ApiComponents apiComponentsFromUrl(String urlString) {
+    try {
+      URL url = new URL(urlString);
+      String rootUrl = url.getProtocol() + "://" + url.getHost() +
+          (url.getPort() > 0 ? ":" + url.getPort() : "");
+      return new ApiComponents(rootUrl, url.getPath());
+    } catch (MalformedURLException e) {
+      throw new RuntimeException("Invalid URL: " + urlString);
+    }
+  }
+
+  /**
+   * Returns a Google Cloud Dataflow client builder.
+   */
+  public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) {
+    String servicePath = options.getDataflowEndpoint();
+    ApiComponents components;
+    if (servicePath.contains("://")) {
+      components = apiComponentsFromUrl(servicePath);
+    } else {
+      components = new ApiComponents(options.getApiRootUrl(), servicePath);
+    }
+
+    return new Dataflow.Builder(getTransport(),
+        getJsonFactory(),
+        chainHttpRequestInitializer(
+            options.getGcpCredential(),
+            // Do not log 404. It clutters the output and is possibly even required by the
caller.
+            new RetryHttpRequestInitializer(ImmutableList.of(404))))
+        .setApplicationName(options.getAppName())
+        .setRootUrl(components.rootUrl)
+        .setServicePath(components.servicePath)
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options)
{
+    return new Clouddebugger.Builder(getTransport(),
+        getJsonFactory(),
+        chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  /**
+   * Returns a Dataflow client that does not automatically retry failed
+   * requests.
+   */
+  public static Dataflow.Builder
+      newRawDataflowClient(DataflowPipelineOptions options) {
+    return newDataflowClient(options)
+        .setHttpRequestInitializer(options.getGcpCredential())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  private static HttpRequestInitializer chainHttpRequestInitializer(
+      Credential credential, HttpRequestInitializer httpRequestInitializer) {
+    if (credential == null) {
+      return httpRequestInitializer;
+    } else {
+      return new ChainingHttpRequestInitializer(credential, httpRequestInitializer);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
index 187d164..5888822 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
@@ -24,13 +24,10 @@ import com.google.api.client.http.HttpTransport;
 import com.google.api.client.json.JsonFactory;
 import com.google.api.client.json.jackson2.JacksonFactory;
 import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.clouddebugger.v2.Clouddebugger;
-import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.pubsub.Pubsub;
 import com.google.api.services.storage.Storage;
 import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
 import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
 import com.google.cloud.dataflow.sdk.options.GcsOptions;
 import com.google.cloud.dataflow.sdk.options.PubsubOptions;
 import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
@@ -128,49 +125,6 @@ public class Transport {
   }
 
   /**
-   * Returns a Google Cloud Dataflow client builder.
-   */
-  public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) {
-    String servicePath = options.getDataflowEndpoint();
-    ApiComponents components;
-    if (servicePath.contains("://")) {
-      components = apiComponentsFromUrl(servicePath);
-    } else {
-      components = new ApiComponents(options.getApiRootUrl(), servicePath);
-    }
-
-    return new Dataflow.Builder(getTransport(),
-        getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log 404. It clutters the output and is possibly even required by the
caller.
-            new RetryHttpRequestInitializer(ImmutableList.of(404))))
-        .setApplicationName(options.getAppName())
-        .setRootUrl(components.rootUrl)
-        .setServicePath(components.servicePath)
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options)
{
-    return new Clouddebugger.Builder(getTransport(),
-        getJsonFactory(),
-        chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
-   * Returns a Dataflow client that does not automatically retry failed
-   * requests.
-   */
-  public static Dataflow.Builder
-      newRawDataflowClient(DataflowPipelineOptions options) {
-    return newDataflowClient(options)
-        .setHttpRequestInitializer(options.getGcpCredential())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
    * Returns a Cloud Storage client builder.
    *
    * <p>Note: this client's endpoint is <b>not</b> modified by the

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java
index 3201608..c2f0bb8 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java
@@ -22,8 +22,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 import com.google.api.services.bigquery.Bigquery.Datasets.Delete;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Create;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get;
+import com.google.api.services.storage.Storage;
 import com.google.cloud.dataflow.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
 import com.google.cloud.dataflow.sdk.util.TestCredential;
 import com.google.cloud.dataflow.sdk.util.Transport;
@@ -37,104 +36,104 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link GoogleApiDebugOptions}. */
 @RunWith(JUnit4.class)
 public class GoogleApiDebugOptionsTest {
+  private static final String STORAGE_GET_TRACE =
+      "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}";
+  private static final String STORAGE_GET_AND_LIST_TRACE =
+      "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\","
+      + "\"Objects.List\":\"ListTraceDestination\"}";
+  private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}";
+
   @Test
   public void testWhenTracingMatches() throws Exception {
-    String[] args =
-        new String[] {"--googleApiTrace={\"Projects.Jobs.Get\":\"GetTraceDestination\"}"};
-    DataflowPipelineOptions options =
-        PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class);
+    String[] args = new String[] {STORAGE_GET_TRACE};
+    GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
     options.setGcpCredential(new TestCredential());
-
     assertNotNull(options.getGoogleApiTrace());
 
-    Get request =
-        options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId");
+    Storage.Objects.Get request =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
     assertEquals("GetTraceDestination", request.get("$trace"));
   }
 
   @Test
   public void testWhenTracingDoesNotMatch() throws Exception {
-    String[] args = new String[] {"--googleApiTrace={\"Projects.Jobs.Create\":\"testToken\"}"};
-    DataflowPipelineOptions options =
-        PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class);
+    String[] args = new String[] {STORAGE_GET_TRACE};
+    GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
     options.setGcpCredential(new TestCredential());
 
     assertNotNull(options.getGoogleApiTrace());
 
-    Get request =
-        options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId");
+    Storage.Objects.List request =
+        Transport.newStorageClient(options).build().objects().list("testProjectId");
     assertNull(request.get("$trace"));
   }
 
   @Test
   public void testWithMultipleTraces() throws Exception {
-    String[] args = new String[] {
-        "--googleApiTrace={\"Projects.Jobs.Create\":\"CreateTraceDestination\","
-        + "\"Projects.Jobs.Get\":\"GetTraceDestination\"}"};
-    DataflowPipelineOptions options =
-        PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class);
+    String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE};
+    GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
     options.setGcpCredential(new TestCredential());
 
     assertNotNull(options.getGoogleApiTrace());
 
-    Get getRequest =
-        options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId");
+    Storage.Objects.Get getRequest =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
     assertEquals("GetTraceDestination", getRequest.get("$trace"));
 
-    Create createRequest =
-        options.getDataflowClient().projects().jobs().create("testProjectId", null);
-    assertEquals("CreateTraceDestination", createRequest.get("$trace"));
+    Storage.Objects.List listRequest =
+        Transport.newStorageClient(options).build().objects().list("testProjectId");
+    assertEquals("ListTraceDestination", listRequest.get("$trace"));
   }
 
   @Test
-  public void testMatchingAllDataflowCalls() throws Exception {
-    String[] args = new String[] {"--googleApiTrace={\"Dataflow\":\"TraceDestination\"}"};
-    DataflowPipelineOptions options =
-        PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class);
+  public void testMatchingAllCalls() throws Exception {
+    String[] args = new String[] {STORAGE_TRACE};
+    GcsOptions options =
+        PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
     options.setGcpCredential(new TestCredential());
 
     assertNotNull(options.getGoogleApiTrace());
 
-    Get getRequest =
-        options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId");
+    Storage.Objects.Get getRequest =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
     assertEquals("TraceDestination", getRequest.get("$trace"));
 
-    Create createRequest =
-        options.getDataflowClient().projects().jobs().create("testProjectId", null);
-    assertEquals("TraceDestination", createRequest.get("$trace"));
+    Storage.Objects.List listRequest =
+        Transport.newStorageClient(options).build().objects().list("testProjectId");
+    assertEquals("TraceDestination", listRequest.get("$trace"));
   }
 
   @Test
   public void testMatchingAgainstClient() throws Exception {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
     options.setGcpCredential(new TestCredential());
     options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor(
-        Transport.newDataflowClient(options).build(), "TraceDestination"));
+        Transport.newStorageClient(options).build(), "TraceDestination"));
 
-    Get getRequest =
-        options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId");
+    Storage.Objects.Get getRequest =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
     assertEquals("TraceDestination", getRequest.get("$trace"));
 
-    Delete deleteRequest = Transport.newBigQueryClient(options).build().datasets()
-        .delete("testProjectId", "testDatasetId");
+    Delete deleteRequest = Transport.newBigQueryClient(options.as(BigQueryOptions.class))
+        .build().datasets().delete("testProjectId", "testDatasetId");
     assertNull(deleteRequest.get("$trace"));
   }
 
   @Test
   public void testMatchingAgainstRequestType() throws Exception {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
     options.setGcpCredential(new TestCredential());
     options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor(
-        Transport.newDataflowClient(options).build().projects().jobs()
-            .get("aProjectId", "aJobId"), "TraceDestination"));
+        Transport.newStorageClient(options).build().objects()
+            .get("aProjectId", "aObjectId"), "TraceDestination"));
 
-    Get getRequest =
-        options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId");
+    Storage.Objects.Get getRequest =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
     assertEquals("TraceDestination", getRequest.get("$trace"));
 
-    Create createRequest =
-        options.getDataflowClient().projects().jobs().create("testProjectId", null);
-    assertNull(createRequest.get("$trace"));
+    Storage.Objects.List listRequest =
+        Transport.newStorageClient(options).build().objects().list("testProjectId");
+    assertNull(listRequest.get("$trace"));
   }
 
   @Test


Mime
View raw message